diff --git a/Cargo.lock b/Cargo.lock index 7c714004..4b70a68d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3970,12 +3970,9 @@ dependencies = [ "clap 4.2.4", "const_env", "dashmap", - "dotenv", "futures", "jsonrpsee", - "lazy_static", "log", - "prometheus", "quinn", "rustls 0.20.6", "serde", @@ -3991,7 +3988,6 @@ dependencies = [ "solana-version", "thiserror", "tokio", - "tokio-postgres", "tracing-subscriber", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index 18d4d673..93c7b13d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -31,10 +31,6 @@ dashmap = { workspace = true } const_env = { workspace = true } jsonrpsee = { workspace = true } tracing-subscriber = { workspace = true } -tokio-postgres = { workspace = true } -prometheus = { workspace = true } -lazy_static = { workspace = true } -dotenv = { workspace = true } async-channel = { workspace = true } quinn = { workspace = true } chrono = { workspace = true } diff --git a/core/src/leader_schedule.rs b/core/src/leader_schedule.rs index 787a1de3..0d3dacc7 100644 --- a/core/src/leader_schedule.rs +++ b/core/src/leader_schedule.rs @@ -1,16 +1,12 @@ -use std::{ - sync::Arc, - collections::VecDeque, str::FromStr, -}; +use std::{collections::VecDeque, str::FromStr, sync::Arc}; use dashmap::DashMap; use log::warn; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client_api::response::RpcContactInfo; -use solana_sdk::{slot_history::Slot, pubkey::Pubkey}; +use solana_sdk::{pubkey::Pubkey, slot_history::Slot}; use tokio::sync::RwLock; - pub struct LeaderData { pub contact_info: Arc, pub leader_slot: Slot, @@ -23,11 +19,11 @@ pub struct LeaderSchedule { impl LeaderSchedule { pub fn new(leaders_to_cache_count: usize) -> Self { - Self { - leader_schedule: RwLock::new(VecDeque::new()), + Self { + leader_schedule: RwLock::new(VecDeque::new()), leaders_to_cache_count, cluster_nodes: Arc::new(DashMap::new()), - } + } } pub async fn len(&self) -> usize { @@ -48,7 +44,12 @@ impl LeaderSchedule { Ok(()) } - pub async fn update_leader_schedule(&self, rpc_client: Arc, current_slot: u64, estimated_slot: u64) -> anyhow::Result<()> { + pub async fn update_leader_schedule( + &self, + rpc_client: Arc, + current_slot: u64, + estimated_slot: u64, + ) -> anyhow::Result<()> { let (queue_begin_slot, queue_end_slot) = { let mut leader_queue = self.leader_schedule.write().await; // remove old leaders @@ -107,4 +108,4 @@ impl LeaderSchedule { } next_leaders } -} \ No newline at end of file +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 139219de..84f3a4b0 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,5 +1,5 @@ +pub mod leader_schedule; pub mod quic_connection_utils; pub mod rotating_queue; pub mod solana_utils; pub mod structures; -pub mod leader_schedule; \ No newline at end of file diff --git a/core/src/quic_connection_utils.rs b/core/src/quic_connection_utils.rs index 5a695a34..5a2d5ac8 100644 --- a/core/src/quic_connection_utils.rs +++ b/core/src/quic_connection_utils.rs @@ -1,9 +1,12 @@ use log::{trace, warn}; -use quinn::{Connection, ConnectionError, Endpoint, SendStream, EndpointConfig, TokioRuntime, ClientConfig, TransportConfig, IdleTimeout}; +use quinn::{ + ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, + TokioRuntime, TransportConfig, +}; use solana_sdk::pubkey::Pubkey; use std::{ collections::VecDeque, - net::{SocketAddr, IpAddr, Ipv4Addr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, @@ -17,7 +20,6 @@ const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; pub struct QuicConnectionUtils {} impl QuicConnectionUtils { - pub fn create_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Endpoint { let mut endpoint = { let client_socket = @@ -50,7 +52,7 @@ impl QuicConnectionUtils { endpoint } - + pub async fn make_connection( endpoint: Endpoint, addr: SocketAddr, @@ -86,6 +88,7 @@ impl QuicConnectionUtils { Ok(connection) } + #[allow(clippy::too_many_arguments)] pub async fn connect( identity: Pubkey, already_connected: bool, @@ -100,8 +103,7 @@ impl QuicConnectionUtils { let conn = if already_connected { Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout).await } else { - let conn = Self::make_connection(endpoint.clone(), addr, connection_timeout).await; - conn + Self::make_connection(endpoint.clone(), addr, connection_timeout).await }; match conn { Ok(conn) => { @@ -184,6 +186,7 @@ impl QuicConnectionUtils { } } + #[allow(clippy::too_many_arguments)] pub async fn send_transaction_batch( connection: Arc>, txs: Vec>, diff --git a/core/src/solana_utils.rs b/core/src/solana_utils.rs index 5fdf55d5..61113dcc 100644 --- a/core/src/solana_utils.rs +++ b/core/src/solana_utils.rs @@ -1,15 +1,29 @@ -use std::{collections::HashMap, sync::Arc}; -use log::info; -use solana_rpc_client::nonblocking::rpc_client::{RpcClient}; +use crate::structures::identity_stakes::IdentityStakes; +use futures::StreamExt; +use log::{error, info, warn}; +use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::pubkey::Pubkey; use solana_streamer::nonblocking::quic::ConnectionPeerType; -use crate::structures::identity_stakes::IdentityStakes; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::sync::mpsc::UnboundedReceiver; -pub struct SolanaUtils { -} +const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400; + +pub struct SolanaUtils {} impl SolanaUtils { - pub async fn get_stakes_for_identity(rpc_client: Arc, identity: Pubkey)-> anyhow::Result { + pub async fn get_stakes_for_identity( + rpc_client: Arc, + identity: Pubkey, + ) -> anyhow::Result { let vote_accounts = rpc_client.get_vote_accounts().await?; let map_of_stakes: HashMap = vote_accounts .current @@ -44,4 +58,112 @@ impl SolanaUtils { Ok(IdentityStakes::default()) } } -} \ No newline at end of file + + pub async fn poll_slots( + rpc_client: Arc, + pubsub_client: Arc, + update_slot: impl Fn(u64), + ) { + loop { + let slot = rpc_client + .get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig { + commitment: solana_sdk::commitment_config::CommitmentLevel::Processed, + }) + .await; + match slot { + Ok(slot) => { + update_slot(slot); + } + Err(e) => { + // error getting slot + error!("error getting slot {}", e); + tokio::time::sleep(Duration::from_millis(10)).await; + continue; + } + } + + let res = + tokio::time::timeout(Duration::from_millis(1000), pubsub_client.slot_subscribe()) + .await; + match res { + Ok(sub_res) => { + match sub_res { + Ok((mut client, unsub)) => { + loop { + let next = tokio::time::timeout( + Duration::from_millis(2000), + client.next(), + ) + .await; + match next { + Ok(slot_info) => { + if let Some(slot_info) = slot_info { + update_slot(slot_info.slot); + } + } + Err(_) => { + // timedout reconnect to pubsub + warn!("slot pub sub disconnected reconnecting"); + break; + } + } + } + unsub(); + } + Err(e) => { + warn!("slot pub sub disconnected ({}) reconnecting", e); + } + } + } + Err(_) => { + // timed out + warn!("timedout subscribing to slots"); + } + } + } + } + + // Estimates the slots, either from polled slot or by forcefully updating after every 400ms + // returns if the estimated slot was updated or not + pub async fn slot_estimator( + slot_update_notifier: &mut UnboundedReceiver, + current_slot: Arc, + estimated_slot: Arc, + ) -> bool { + match tokio::time::timeout( + Duration::from_millis(AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS), + slot_update_notifier.recv(), + ) + .await + { + Ok(recv) => { + if let Some(slot) = recv { + if slot > estimated_slot.load(Ordering::Relaxed) { + // incase of multilple slot update events / take the current slot + let current_slot = current_slot.load(Ordering::Relaxed); + estimated_slot.store(current_slot, Ordering::Relaxed); + true + } else { + // queue is late estimate slot is already ahead + false + } + } else { + false + } + } + Err(_) => { + // force update the slot + let es = estimated_slot.load(Ordering::Relaxed); + let cs = current_slot.load(Ordering::Relaxed); + // estimated slot should not go ahead more than 32 slots + // this is because it may be a slot block + if es < cs + 32 { + estimated_slot.fetch_add(1, Ordering::Relaxed); + true + } else { + false + } + } + } + } +} diff --git a/core/src/structures/identity_stakes.rs b/core/src/structures/identity_stakes.rs index 74d28e63..2b1194cd 100644 --- a/core/src/structures/identity_stakes.rs +++ b/core/src/structures/identity_stakes.rs @@ -19,4 +19,4 @@ impl Default for IdentityStakes { min_stakes: 0, } } -} \ No newline at end of file +} diff --git a/core/src/structures/mod.rs b/core/src/structures/mod.rs index 1edf0cc8..75c029b6 100644 --- a/core/src/structures/mod.rs +++ b/core/src/structures/mod.rs @@ -1 +1 @@ -pub mod identity_stakes; \ No newline at end of file +pub mod identity_stakes; diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index 4700c54e..cf794a2c 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -1,24 +1,23 @@ +use crate::tx_sender::TxProps; +use dashmap::DashMap; +use log::{error, trace}; +use prometheus::{core::GenericGauge, opts, register_int_gauge}; +use quinn::{Connection, Endpoint}; +use solana_lite_rpc_core::{ + quic_connection_utils::QuicConnectionUtils, rotating_queue::RotatingQueue, + structures::identity_stakes::IdentityStakes, +}; +use solana_sdk::pubkey::Pubkey; +use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; use std::{ collections::HashMap, - net::{SocketAddr}, + net::SocketAddr, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, time::Duration, }; -use crate::tx_sender::TxProps; -use dashmap::DashMap; -use log::{error, trace}; -use prometheus::{core::GenericGauge, opts, register_int_gauge}; -use quinn::{ - Connection, Endpoint -}; -use solana_lite_rpc_core::{ - quic_connection_utils::QuicConnectionUtils, rotating_queue::RotatingQueue, structures::identity_stakes::IdentityStakes -}; -use solana_sdk::pubkey::Pubkey; -use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; use tokio::sync::{broadcast::Receiver, broadcast::Sender, RwLock}; pub const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(1); @@ -303,4 +302,4 @@ impl TpuConnectionManager { } } } -} \ No newline at end of file +} diff --git a/services/src/tpu_utils/tpu_service.rs b/services/src/tpu_utils/tpu_service.rs index 33989c75..b75c5711 100644 --- a/services/src/tpu_utils/tpu_service.rs +++ b/services/src/tpu_utils/tpu_service.rs @@ -1,19 +1,17 @@ use anyhow::Result; use dashmap::DashMap; -use futures::StreamExt; -use log::{error, info, warn}; +use log::{error, info}; use prometheus::{core::GenericGauge, opts, register_int_gauge}; -use solana_client::{ - nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient}, -}; +use solana_client::nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient}; -use solana_lite_rpc_core::{structures::identity_stakes::IdentityStakes, solana_utils::SolanaUtils, leader_schedule::LeaderSchedule}; +use solana_lite_rpc_core::{ + leader_schedule::LeaderSchedule, solana_utils::SolanaUtils, + structures::identity_stakes::IdentityStakes, +}; use solana_sdk::{ pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, signer::Signer, slot_history::Slot, }; -use solana_streamer::{ - tls_certificates::new_self_signed_tls_certificate, -}; +use solana_streamer::tls_certificates::new_self_signed_tls_certificate; use std::{ net::{IpAddr, Ipv4Addr}, str::FromStr, @@ -34,7 +32,6 @@ use crate::tx_sender::TxProps; const CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE: usize = 1024; // Save pubkey and contact info of next 1024 leaders in the queue const CLUSTERINFO_REFRESH_TIME: u64 = 60 * 60; // stakes every 1hrs const LEADER_SCHEDULE_UPDATE_INTERVAL: u64 = 10; // update leader schedule every 10s -const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400; const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000; lazy_static::lazy_static! { @@ -106,7 +103,11 @@ impl TpuService { // update stakes for the identity { let mut lock = self.identity_stakes.write().await; - *lock = SolanaUtils::get_stakes_for_identity(self.rpc_client.clone(), self.identity.pubkey()).await?; + *lock = SolanaUtils::get_stakes_for_identity( + self.rpc_client.clone(), + self.identity.pubkey(), + ) + .await?; } Ok(()) } @@ -119,7 +120,9 @@ impl TpuService { pub async fn update_leader_schedule(&self) -> Result<()> { let current_slot = self.current_slot.load(Ordering::Relaxed); let estimated_slot = self.estimated_slot.load(Ordering::Relaxed); - self.leader_schedule.update_leader_schedule(self.rpc_client.clone(), current_slot, estimated_slot).await?; + self.leader_schedule + .update_leader_schedule(self.rpc_client.clone(), current_slot, estimated_slot) + .await?; NB_OF_LEADERS_IN_SCHEDULE.set(self.leader_schedule.len().await as i64); NB_CLUSTER_NODES.set(self.leader_schedule.cluster_nodes_len() as i64); Ok(()) @@ -164,78 +167,28 @@ impl TpuService { } async fn update_current_slot(&self, update_notifier: tokio::sync::mpsc::UnboundedSender) { + let current_slot = self.current_slot.clone(); + let update_slot = |slot: u64| { - if slot > self.current_slot.load(Ordering::Relaxed) { - self.current_slot.store(slot, Ordering::Relaxed); + if slot > current_slot.load(Ordering::Relaxed) { + current_slot.store(slot, Ordering::Relaxed); CURRENT_SLOT.set(slot as i64); let _ = update_notifier.send(slot); } }; - loop { - let slot = self - .rpc_client - .get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig { - commitment: solana_sdk::commitment_config::CommitmentLevel::Processed, - }) - .await; - match slot { - Ok(slot) => { - update_slot(slot); - } - Err(e) => { - // error getting slot - error!("error getting slot {}", e); - tokio::time::sleep(Duration::from_millis(10)).await; - continue; - } - } - - let res = tokio::time::timeout( - Duration::from_millis(1000), - self.pubsub_client.slot_subscribe(), - ) - .await; - match res { - Ok(sub_res) => { - match sub_res { - Ok((mut client, unsub)) => { - loop { - let next = tokio::time::timeout( - Duration::from_millis(2000), - client.next(), - ) - .await; - match next { - Ok(slot_info) => { - if let Some(slot_info) = slot_info { - update_slot(slot_info.slot); - } - } - Err(_) => { - // timedout reconnect to pubsub - warn!("slot pub sub disconnected reconnecting"); - break; - } - } - } - unsub(); - } - Err(e) => { - warn!("slot pub sub disconnected ({}) reconnecting", e); - } - } - } - Err(_) => { - // timed out - warn!("timedout subscribing to slots"); - } - } - } + SolanaUtils::poll_slots( + self.rpc_client.clone(), + self.pubsub_client.clone(), + update_slot, + ) + .await; } pub async fn start(&self) -> anyhow::Result>>> { - self.leader_schedule.load_cluster_info(self.rpc_client.clone()).await?; + self.leader_schedule + .load_cluster_info(self.rpc_client.clone()) + .await?; self.update_current_stakes().await?; self.update_leader_schedule().await?; self.update_quic_connections().await; @@ -274,47 +227,16 @@ impl TpuService { let current_slot = self.current_slot.clone(); let this = self.clone(); let estimated_slot_calculation = tokio::spawn(async move { - // this is an estimated slot. we get the current slot and if we do not recieve any notification in 400ms we update it manually - let mut slot_reciever = slot_reciever; + let mut slot_update_notifier = slot_reciever; loop { - let update_connections = match tokio::time::timeout( - Duration::from_millis(AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS), - slot_reciever.recv(), + if SolanaUtils::slot_estimator( + &mut slot_update_notifier, + current_slot.clone(), + estimated_slot.clone(), ) .await { - Ok(recv) => { - if let Some(slot) = recv { - if slot > estimated_slot.load(Ordering::Relaxed) { - // incase of multilple slot update events / take the current slot - let current_slot = current_slot.load(Ordering::Relaxed); - estimated_slot.store(current_slot, Ordering::Relaxed); - ESTIMATED_SLOT.set(current_slot as i64); - true - } else { - // queue is late estimate slot is already ahead - false - } - } else { - false - } - } - Err(_) => { - let es = estimated_slot.load(Ordering::Relaxed); - let cs = current_slot.load(Ordering::Relaxed); - // estimated slot should not go ahead more than 32 slots - // this is because it may be a slot block - if es < cs + 32 { - estimated_slot.fetch_add(1, Ordering::Relaxed); - ESTIMATED_SLOT.set((es + 1) as i64); - true - } else { - false - } - } - }; - - if update_connections { + ESTIMATED_SLOT.set(estimated_slot.load(Ordering::Relaxed) as i64); this.update_quic_connections().await; } }