moved code from TPU service and TPU connection to core, format and clippy

This commit is contained in:
Godmode Galactus 2023-06-08 11:27:02 +02:00
parent 00f2b39f52
commit 4fc05f062d
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
10 changed files with 201 additions and 162 deletions

4
Cargo.lock generated
View File

@ -3970,12 +3970,9 @@ dependencies = [
"clap 4.2.4",
"const_env",
"dashmap",
"dotenv",
"futures",
"jsonrpsee",
"lazy_static",
"log",
"prometheus",
"quinn",
"rustls 0.20.6",
"serde",
@ -3991,7 +3988,6 @@ dependencies = [
"solana-version",
"thiserror",
"tokio",
"tokio-postgres",
"tracing-subscriber",
]

View File

@ -31,10 +31,6 @@ dashmap = { workspace = true }
const_env = { workspace = true }
jsonrpsee = { workspace = true }
tracing-subscriber = { workspace = true }
tokio-postgres = { workspace = true }
prometheus = { workspace = true }
lazy_static = { workspace = true }
dotenv = { workspace = true }
async-channel = { workspace = true }
quinn = { workspace = true }
chrono = { workspace = true }

View File

@ -1,16 +1,12 @@
use std::{
sync::Arc,
collections::VecDeque, str::FromStr,
};
use std::{collections::VecDeque, str::FromStr, sync::Arc};
use dashmap::DashMap;
use log::warn;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::response::RpcContactInfo;
use solana_sdk::{slot_history::Slot, pubkey::Pubkey};
use solana_sdk::{pubkey::Pubkey, slot_history::Slot};
use tokio::sync::RwLock;
pub struct LeaderData {
pub contact_info: Arc<RpcContactInfo>,
pub leader_slot: Slot,
@ -23,11 +19,11 @@ pub struct LeaderSchedule {
impl LeaderSchedule {
pub fn new(leaders_to_cache_count: usize) -> Self {
Self {
leader_schedule: RwLock::new(VecDeque::new()),
Self {
leader_schedule: RwLock::new(VecDeque::new()),
leaders_to_cache_count,
cluster_nodes: Arc::new(DashMap::new()),
}
}
}
pub async fn len(&self) -> usize {
@ -48,7 +44,12 @@ impl LeaderSchedule {
Ok(())
}
pub async fn update_leader_schedule(&self, rpc_client: Arc<RpcClient>, current_slot: u64, estimated_slot: u64) -> anyhow::Result<()> {
pub async fn update_leader_schedule(
&self,
rpc_client: Arc<RpcClient>,
current_slot: u64,
estimated_slot: u64,
) -> anyhow::Result<()> {
let (queue_begin_slot, queue_end_slot) = {
let mut leader_queue = self.leader_schedule.write().await;
// remove old leaders
@ -107,4 +108,4 @@ impl LeaderSchedule {
}
next_leaders
}
}
}

View File

@ -1,5 +1,5 @@
pub mod leader_schedule;
pub mod quic_connection_utils;
pub mod rotating_queue;
pub mod solana_utils;
pub mod structures;
pub mod leader_schedule;

View File

@ -1,9 +1,12 @@
use log::{trace, warn};
use quinn::{Connection, ConnectionError, Endpoint, SendStream, EndpointConfig, TokioRuntime, ClientConfig, TransportConfig, IdleTimeout};
use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
TokioRuntime, TransportConfig,
};
use solana_sdk::pubkey::Pubkey;
use std::{
collections::VecDeque,
net::{SocketAddr, IpAddr, Ipv4Addr},
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
@ -17,7 +20,6 @@ const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
pub struct QuicConnectionUtils {}
impl QuicConnectionUtils {
pub fn create_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Endpoint {
let mut endpoint = {
let client_socket =
@ -50,7 +52,7 @@ impl QuicConnectionUtils {
endpoint
}
pub async fn make_connection(
endpoint: Endpoint,
addr: SocketAddr,
@ -86,6 +88,7 @@ impl QuicConnectionUtils {
Ok(connection)
}
#[allow(clippy::too_many_arguments)]
pub async fn connect(
identity: Pubkey,
already_connected: bool,
@ -100,8 +103,7 @@ impl QuicConnectionUtils {
let conn = if already_connected {
Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout).await
} else {
let conn = Self::make_connection(endpoint.clone(), addr, connection_timeout).await;
conn
Self::make_connection(endpoint.clone(), addr, connection_timeout).await
};
match conn {
Ok(conn) => {
@ -184,6 +186,7 @@ impl QuicConnectionUtils {
}
}
#[allow(clippy::too_many_arguments)]
pub async fn send_transaction_batch(
connection: Arc<RwLock<Connection>>,
txs: Vec<Vec<u8>>,

View File

@ -1,15 +1,29 @@
use std::{collections::HashMap, sync::Arc};
use log::info;
use solana_rpc_client::nonblocking::rpc_client::{RpcClient};
use crate::structures::identity_stakes::IdentityStakes;
use futures::StreamExt;
use log::{error, info, warn};
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::ConnectionPeerType;
use crate::structures::identity_stakes::IdentityStakes;
use std::{
collections::HashMap,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use tokio::sync::mpsc::UnboundedReceiver;
pub struct SolanaUtils {
}
const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400;
pub struct SolanaUtils {}
impl SolanaUtils {
pub async fn get_stakes_for_identity(rpc_client: Arc<RpcClient>, identity: Pubkey)-> anyhow::Result<IdentityStakes> {
pub async fn get_stakes_for_identity(
rpc_client: Arc<RpcClient>,
identity: Pubkey,
) -> anyhow::Result<IdentityStakes> {
let vote_accounts = rpc_client.get_vote_accounts().await?;
let map_of_stakes: HashMap<String, u64> = vote_accounts
.current
@ -44,4 +58,112 @@ impl SolanaUtils {
Ok(IdentityStakes::default())
}
}
}
pub async fn poll_slots(
rpc_client: Arc<RpcClient>,
pubsub_client: Arc<PubsubClient>,
update_slot: impl Fn(u64),
) {
loop {
let slot = rpc_client
.get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig {
commitment: solana_sdk::commitment_config::CommitmentLevel::Processed,
})
.await;
match slot {
Ok(slot) => {
update_slot(slot);
}
Err(e) => {
// error getting slot
error!("error getting slot {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
}
let res =
tokio::time::timeout(Duration::from_millis(1000), pubsub_client.slot_subscribe())
.await;
match res {
Ok(sub_res) => {
match sub_res {
Ok((mut client, unsub)) => {
loop {
let next = tokio::time::timeout(
Duration::from_millis(2000),
client.next(),
)
.await;
match next {
Ok(slot_info) => {
if let Some(slot_info) = slot_info {
update_slot(slot_info.slot);
}
}
Err(_) => {
// timedout reconnect to pubsub
warn!("slot pub sub disconnected reconnecting");
break;
}
}
}
unsub();
}
Err(e) => {
warn!("slot pub sub disconnected ({}) reconnecting", e);
}
}
}
Err(_) => {
// timed out
warn!("timedout subscribing to slots");
}
}
}
}
// Estimates the slots, either from polled slot or by forcefully updating after every 400ms
// returns if the estimated slot was updated or not
pub async fn slot_estimator(
slot_update_notifier: &mut UnboundedReceiver<u64>,
current_slot: Arc<AtomicU64>,
estimated_slot: Arc<AtomicU64>,
) -> bool {
match tokio::time::timeout(
Duration::from_millis(AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS),
slot_update_notifier.recv(),
)
.await
{
Ok(recv) => {
if let Some(slot) = recv {
if slot > estimated_slot.load(Ordering::Relaxed) {
// incase of multilple slot update events / take the current slot
let current_slot = current_slot.load(Ordering::Relaxed);
estimated_slot.store(current_slot, Ordering::Relaxed);
true
} else {
// queue is late estimate slot is already ahead
false
}
} else {
false
}
}
Err(_) => {
// force update the slot
let es = estimated_slot.load(Ordering::Relaxed);
let cs = current_slot.load(Ordering::Relaxed);
// estimated slot should not go ahead more than 32 slots
// this is because it may be a slot block
if es < cs + 32 {
estimated_slot.fetch_add(1, Ordering::Relaxed);
true
} else {
false
}
}
}
}
}

View File

@ -19,4 +19,4 @@ impl Default for IdentityStakes {
min_stakes: 0,
}
}
}
}

View File

@ -1 +1 @@
pub mod identity_stakes;
pub mod identity_stakes;

View File

@ -1,24 +1,23 @@
use crate::tx_sender::TxProps;
use dashmap::DashMap;
use log::{error, trace};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{Connection, Endpoint};
use solana_lite_rpc_core::{
quic_connection_utils::QuicConnectionUtils, rotating_queue::RotatingQueue,
structures::identity_stakes::IdentityStakes,
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use std::{
collections::HashMap,
net::{SocketAddr},
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use crate::tx_sender::TxProps;
use dashmap::DashMap;
use log::{error, trace};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{
Connection, Endpoint
};
use solana_lite_rpc_core::{
quic_connection_utils::QuicConnectionUtils, rotating_queue::RotatingQueue, structures::identity_stakes::IdentityStakes
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use tokio::sync::{broadcast::Receiver, broadcast::Sender, RwLock};
pub const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(1);
@ -303,4 +302,4 @@ impl TpuConnectionManager {
}
}
}
}
}

View File

@ -1,19 +1,17 @@
use anyhow::Result;
use dashmap::DashMap;
use futures::StreamExt;
use log::{error, info, warn};
use log::{error, info};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_client::{
nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient},
};
use solana_client::nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient};
use solana_lite_rpc_core::{structures::identity_stakes::IdentityStakes, solana_utils::SolanaUtils, leader_schedule::LeaderSchedule};
use solana_lite_rpc_core::{
leader_schedule::LeaderSchedule, solana_utils::SolanaUtils,
structures::identity_stakes::IdentityStakes,
};
use solana_sdk::{
pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, signer::Signer, slot_history::Slot,
};
use solana_streamer::{
tls_certificates::new_self_signed_tls_certificate,
};
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::{
net::{IpAddr, Ipv4Addr},
str::FromStr,
@ -34,7 +32,6 @@ use crate::tx_sender::TxProps;
const CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE: usize = 1024; // Save pubkey and contact info of next 1024 leaders in the queue
const CLUSTERINFO_REFRESH_TIME: u64 = 60 * 60; // stakes every 1hrs
const LEADER_SCHEDULE_UPDATE_INTERVAL: u64 = 10; // update leader schedule every 10s
const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400;
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000;
lazy_static::lazy_static! {
@ -106,7 +103,11 @@ impl TpuService {
// update stakes for the identity
{
let mut lock = self.identity_stakes.write().await;
*lock = SolanaUtils::get_stakes_for_identity(self.rpc_client.clone(), self.identity.pubkey()).await?;
*lock = SolanaUtils::get_stakes_for_identity(
self.rpc_client.clone(),
self.identity.pubkey(),
)
.await?;
}
Ok(())
}
@ -119,7 +120,9 @@ impl TpuService {
pub async fn update_leader_schedule(&self) -> Result<()> {
let current_slot = self.current_slot.load(Ordering::Relaxed);
let estimated_slot = self.estimated_slot.load(Ordering::Relaxed);
self.leader_schedule.update_leader_schedule(self.rpc_client.clone(), current_slot, estimated_slot).await?;
self.leader_schedule
.update_leader_schedule(self.rpc_client.clone(), current_slot, estimated_slot)
.await?;
NB_OF_LEADERS_IN_SCHEDULE.set(self.leader_schedule.len().await as i64);
NB_CLUSTER_NODES.set(self.leader_schedule.cluster_nodes_len() as i64);
Ok(())
@ -164,78 +167,28 @@ impl TpuService {
}
async fn update_current_slot(&self, update_notifier: tokio::sync::mpsc::UnboundedSender<u64>) {
let current_slot = self.current_slot.clone();
let update_slot = |slot: u64| {
if slot > self.current_slot.load(Ordering::Relaxed) {
self.current_slot.store(slot, Ordering::Relaxed);
if slot > current_slot.load(Ordering::Relaxed) {
current_slot.store(slot, Ordering::Relaxed);
CURRENT_SLOT.set(slot as i64);
let _ = update_notifier.send(slot);
}
};
loop {
let slot = self
.rpc_client
.get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig {
commitment: solana_sdk::commitment_config::CommitmentLevel::Processed,
})
.await;
match slot {
Ok(slot) => {
update_slot(slot);
}
Err(e) => {
// error getting slot
error!("error getting slot {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
}
let res = tokio::time::timeout(
Duration::from_millis(1000),
self.pubsub_client.slot_subscribe(),
)
.await;
match res {
Ok(sub_res) => {
match sub_res {
Ok((mut client, unsub)) => {
loop {
let next = tokio::time::timeout(
Duration::from_millis(2000),
client.next(),
)
.await;
match next {
Ok(slot_info) => {
if let Some(slot_info) = slot_info {
update_slot(slot_info.slot);
}
}
Err(_) => {
// timedout reconnect to pubsub
warn!("slot pub sub disconnected reconnecting");
break;
}
}
}
unsub();
}
Err(e) => {
warn!("slot pub sub disconnected ({}) reconnecting", e);
}
}
}
Err(_) => {
// timed out
warn!("timedout subscribing to slots");
}
}
}
SolanaUtils::poll_slots(
self.rpc_client.clone(),
self.pubsub_client.clone(),
update_slot,
)
.await;
}
pub async fn start(&self) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
self.leader_schedule.load_cluster_info(self.rpc_client.clone()).await?;
self.leader_schedule
.load_cluster_info(self.rpc_client.clone())
.await?;
self.update_current_stakes().await?;
self.update_leader_schedule().await?;
self.update_quic_connections().await;
@ -274,47 +227,16 @@ impl TpuService {
let current_slot = self.current_slot.clone();
let this = self.clone();
let estimated_slot_calculation = tokio::spawn(async move {
// this is an estimated slot. we get the current slot and if we do not recieve any notification in 400ms we update it manually
let mut slot_reciever = slot_reciever;
let mut slot_update_notifier = slot_reciever;
loop {
let update_connections = match tokio::time::timeout(
Duration::from_millis(AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS),
slot_reciever.recv(),
if SolanaUtils::slot_estimator(
&mut slot_update_notifier,
current_slot.clone(),
estimated_slot.clone(),
)
.await
{
Ok(recv) => {
if let Some(slot) = recv {
if slot > estimated_slot.load(Ordering::Relaxed) {
// incase of multilple slot update events / take the current slot
let current_slot = current_slot.load(Ordering::Relaxed);
estimated_slot.store(current_slot, Ordering::Relaxed);
ESTIMATED_SLOT.set(current_slot as i64);
true
} else {
// queue is late estimate slot is already ahead
false
}
} else {
false
}
}
Err(_) => {
let es = estimated_slot.load(Ordering::Relaxed);
let cs = current_slot.load(Ordering::Relaxed);
// estimated slot should not go ahead more than 32 slots
// this is because it may be a slot block
if es < cs + 32 {
estimated_slot.fetch_add(1, Ordering::Relaxed);
ESTIMATED_SLOT.set((es + 1) as i64);
true
} else {
false
}
}
};
if update_connections {
ESTIMATED_SLOT.set(estimated_slot.load(Ordering::Relaxed) as i64);
this.update_quic_connections().await;
}
}