Merge pull request #150 from blockworks-foundation/restart_and_service_metrics

counters
This commit is contained in:
galactus 2023-06-23 09:14:32 +02:00 committed by GitHub
commit 999df3aea2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 320 additions and 309 deletions

View File

@ -49,4 +49,4 @@ quinn = "0.9.3"
rustls = { version = "=0.20.8", default-features = false }
solana-lite-rpc-services = {path = "services", version="0.2.1"}
solana-lite-rpc-core = {path = "core", version="0.2.1"}
async-trait = "0.1.68"
async-trait = "0.1.68"

View File

@ -1,7 +1,7 @@
use crate::structures::identity_stakes::IdentityStakes;
use anyhow::Context;
use anyhow::{bail, Context};
use futures::StreamExt;
use log::{info, warn};
use log::info;
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
@ -18,7 +18,7 @@ use tokio::sync::mpsc::UnboundedReceiver;
const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400;
pub struct SolanaUtils {}
pub struct SolanaUtils;
impl SolanaUtils {
pub async fn get_stakes_for_identity(
@ -61,7 +61,7 @@ impl SolanaUtils {
}
pub async fn poll_slots(
rpc_client: Arc<RpcClient>,
rpc_client: &RpcClient,
rpc_ws_address: &str,
update_slot: impl Fn(u64),
) -> anyhow::Result<()> {
@ -74,15 +74,15 @@ impl SolanaUtils {
commitment: solana_sdk::commitment_config::CommitmentLevel::Processed,
})
.await
.context("error getting slot")?;
.context("Error getting slot")?;
update_slot(slot);
let (mut client, unsub) =
tokio::time::timeout(Duration::from_millis(1000), pubsub_client.slot_subscribe())
.await
.context("timedout subscribing to slots")?
.context("slot pub sub disconnected")?;
.context("Timeout subscribing to slots")?
.context("Slot pub sub disconnected")?;
while let Ok(slot_info) =
tokio::time::timeout(Duration::from_millis(2000), client.next()).await
@ -92,10 +92,9 @@ impl SolanaUtils {
}
}
warn!("slot pub sub disconnected reconnecting");
unsub();
Ok(())
bail!("Slot pub sub disconnected")
}
// Estimates the slots, either from polled slot or by forcefully updating after every 400ms

View File

@ -57,6 +57,18 @@ lazy_static::lazy_static! {
register_int_counter!(opts!("literpc_rpc_airdrop", "RPC call to request airdrop")).unwrap();
static ref RPC_SIGNATURE_SUBSCRIBE: IntCounter =
register_int_counter!(opts!("literpc_rpc_signature_subscribe", "RPC call to subscribe to signature")).unwrap();
static ref WS_SERVER_FAIL: IntCounter =
register_int_counter!(opts!("literpc_rpc_ws_server_fail", "WebSocket server failed")).unwrap();
static ref METRICS_SERVICE_FAIL: IntCounter =
register_int_counter!(opts!("literpc_rpc_metrics_service_fail", "Metrics service failed")).unwrap();
static ref HTTP_SERVER_FAIL: IntCounter =
register_int_counter!(opts!("literpc_rpc_http_server_fail", "Http server failed")).unwrap();
static ref PROMETHEUS_SERVER_FAIL: IntCounter =
register_int_counter!(opts!("literpc_rpc_prometheus_server_fail", "Prometheus server failed")).unwrap();
static ref POSTGRES_SERVICE_FAIL: IntCounter =
register_int_counter!(opts!("literpc_rpc_postgres_service_fail", "Postgres service failed")).unwrap();
static ref TX_SERVICE_FAIL: IntCounter =
register_int_counter!(opts!("literpc_rpc_tx_service_fail", "Tx service failed")).unwrap();
}
/// A bridge between clients and tpu
@ -147,19 +159,21 @@ impl LiteBridge {
};
let metrics_capture = MetricsCapture::new(self.tx_store.clone()).capture();
let prometheus_sync = PrometheusSync.sync(prometheus_addr);
let prometheus_sync = PrometheusSync::sync(prometheus_addr);
let max_retries = self.max_retries;
// transaction services
let (transaction_service, jh_transaction_services) = self
.transaction_service_builder
.clone()
.start(
postgres_send,
self.block_store.clone(),
max_retries,
self.max_retries,
clean_interval,
)
.await;
);
self.transaction_service = Some(transaction_service);
let rpc = self.into_rpc();
let (ws_server, http_server) = {
@ -201,22 +215,28 @@ impl LiteBridge {
tokio::select! {
res = ws_server => {
bail!("WebSocket server exited unexpectedly {res:?}");
WS_SERVER_FAIL.inc();
bail!("WebSocket server {res:?}");
},
res = http_server => {
bail!("HTTP server exited unexpectedly {res:?}");
HTTP_SERVER_FAIL.inc();
bail!("HTTP server {res:?}");
},
res = metrics_capture => {
bail!("Metrics Capture exited unexpectedly {res:?}");
METRICS_SERVICE_FAIL.inc();
bail!("Metrics Capture {res:?}");
},
res = prometheus_sync => {
bail!("Prometheus Service exited unexpectedly {res:?}");
PROMETHEUS_SERVER_FAIL.inc();
bail!("Prometheus Service {res:?}");
},
res = postgres => {
bail!("Postgres service exited unexpectedly {res:?}");
POSTGRES_SERVICE_FAIL.inc();
bail!("Postgres service {res:?}");
},
res = jh_transaction_services => {
bail!("Transaction service exited unexpectedly {res:?}");
TX_SERVICE_FAIL.inc();
bail!("Transaction service {res:?}");
}
}
}

View File

@ -4,7 +4,7 @@ use crate::{
};
use clap::Parser;
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short, long, default_value_t = String::from(DEFAULT_RPC_ADDR))]

View File

@ -1,14 +1,20 @@
pub mod rpc_tester;
use std::time::Duration;
use anyhow::bail;
use anyhow::Context;
use clap::Parser;
use dotenv::dotenv;
use lite_rpc::{bridge::LiteBridge, cli::Args};
use log::info;
use prometheus::{opts, register_int_counter, IntCounter};
use solana_sdk::signature::Keypair;
use std::env;
async fn get_identity_keypair(identity_from_cli: &String) -> Keypair {
use crate::rpc_tester::RpcTester;
const RESTART_DURATION: Duration = Duration::from_secs(20);
async fn get_identity_keypair(identity_from_cli: &str) -> Keypair {
if let Ok(identity_env_var) = env::var("IDENTITY") {
if let Ok(identity_bytes) = serde_json::from_str::<Vec<u8>>(identity_env_var.as_str()) {
Keypair::from_bytes(identity_bytes.as_slice()).unwrap()
@ -23,7 +29,7 @@ async fn get_identity_keypair(identity_from_cli: &String) -> Keypair {
} else if identity_from_cli.is_empty() {
Keypair::new()
} else {
let identity_file = tokio::fs::read_to_string(identity_from_cli.as_str())
let identity_file = tokio::fs::read_to_string(identity_from_cli)
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file).unwrap();
@ -31,10 +37,12 @@ async fn get_identity_keypair(identity_from_cli: &String) -> Keypair {
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
pub async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
lazy_static::lazy_static! {
static ref RESTARTS: IntCounter =
register_int_counter!(opts!("literpc_rpc_restarts", "Number of times lite rpc restarted")).unwrap();
}
pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> {
let Args {
rpc_addr,
ws_addr,
@ -47,24 +55,14 @@ pub async fn main() -> anyhow::Result<()> {
identity_keypair,
maximum_retries_per_tx,
transaction_retry_after_secs,
} = Args::parse();
dotenv().ok();
} = args;
let identity = get_identity_keypair(&identity_keypair).await;
let retry_after = Duration::from_secs(transaction_retry_after_secs);
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
let enable_postgres = enable_postgres
|| if let Ok(enable_postgres_env_var) = env::var("PG_ENABLED") {
enable_postgres_env_var != "false"
} else {
false
};
let retry_after = Duration::from_secs(transaction_retry_after_secs);
let services = LiteBridge::new(
LiteBridge::new(
rpc_addr,
ws_addr,
fanout_size,
@ -72,23 +70,70 @@ pub async fn main() -> anyhow::Result<()> {
retry_after,
maximum_retries_per_tx,
)
.await?
.await
.context("Error building LiteBridge")?
.start_services(
lite_rpc_http_addr,
lite_rpc_ws_addr,
clean_interval_ms,
enable_postgres,
prometheus_addr,
);
)
.await
}
fn get_args() -> Args {
let mut args = Args::parse();
dotenv().ok();
args.enable_postgres = args.enable_postgres
|| if let Ok(enable_postgres_env_var) = env::var("PG_ENABLED") {
enable_postgres_env_var != "false"
} else {
false
};
args
}
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
pub async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let args = get_args();
let rpc_tester = RpcTester::from(&args).start();
let main = tokio::spawn(async move {
loop {
let Err(err) = start_lite_rpc(args.clone()).await else {
return anyhow::Error::msg("LiteBridge services returned without error");
};
// log and restart
log::error!("Services quit unexpectedly {err:?} restarting in {RESTART_DURATION:?}");
tokio::time::sleep(RESTART_DURATION).await;
// increment restart
log::error!("Restarting services");
RESTARTS.inc();
}
});
let ctrl_c_signal = tokio::signal::ctrl_c();
tokio::select! {
res = services => {
bail!("Services quit unexpectedly {res:?}");
err = rpc_tester => {
// This should never happen
unreachable!("{err:?}")
}
err = main => {
// This should never happen
unreachable!("{err:?}")
}
_ = ctrl_c_signal => {
info!("Received ctrl+c signal");
log::info!("Received ctrl+c signal");
Ok(())
}

View File

@ -6,18 +6,18 @@ use postgres_native_tls::MakeTlsConnector;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use std::{sync::Arc, time::Duration};
use tokio::{
sync::{RwLock, RwLockReadGuard},
task::JoinHandle,
};
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio_postgres::{config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket};
use native_tls::{Certificate, Identity, TlsConnector};
use crate::encoding::BinaryEncoding;
use solana_lite_rpc_core::notifications::{
BlockNotification, NotificationMsg, NotificationReciever, TransactionNotification,
TransactionUpdateNotification,
use solana_lite_rpc_core::{
notifications::{
BlockNotification, NotificationMsg, NotificationReciever, TransactionNotification,
TransactionUpdateNotification,
},
AnyhowJoinHandle,
};
lazy_static::lazy_static! {
@ -411,7 +411,7 @@ impl Postgres {
Ok(self.session.read().await)
}
pub fn start(mut self, mut recv: NotificationReciever) -> JoinHandle<anyhow::Result<()>> {
pub fn start(mut self, mut recv: NotificationReciever) -> AnyhowJoinHandle {
tokio::spawn(async move {
info!("start postgres worker");

View File

@ -0,0 +1,41 @@
use std::net::SocketAddr;
use lite_rpc::cli::Args;
use prometheus::{opts, register_gauge, Gauge};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
lazy_static::lazy_static! {
static ref RPC_RESPONDING: Gauge =
register_gauge!(opts!("literpc_rpc_responding", "If LiteRpc is responding")).unwrap();
}
pub struct RpcTester(RpcClient);
impl From<&Args> for RpcTester {
fn from(value: &Args) -> Self {
let addr: SocketAddr = value
.lite_rpc_http_addr
.parse()
.expect("Invalid literpc http address");
RpcTester(RpcClient::new(format!("http://0.0.0.0:{}", addr.port())))
}
}
impl RpcTester {
/// Starts a loop that checks if the rpc is responding every 5 seconds
pub async fn start(self) -> ! {
loop {
// sleep for 5 seconds
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// do a simple request to self for getVersion
let Err(err) = self.0.get_version().await else {
RPC_RESPONDING.set(1.0);
continue;
};
RPC_RESPONDING.set(0.0);
log::error!("Rpc not responding {err:?}");
}
}
}

View File

@ -1,6 +1,6 @@
use std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
@ -34,6 +34,7 @@ use solana_lite_rpc_core::{
},
subscription_handler::{SubscptionHanderSink, SubscriptionHandler},
tx_store::{TxProps, TxStore},
AnyhowJoinHandle,
};
lazy_static::lazy_static! {
@ -271,7 +272,6 @@ impl BlockListener {
commitment_config: CommitmentConfig,
notifier: Option<NotificationSender>,
estimated_slot: Arc<AtomicU64>,
exit_signal: Arc<AtomicBool>,
) -> anyhow::Result<()> {
let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel();
let (block_schedule_queue_sx, block_schedule_queue_rx) = async_channel::unbounded::<Slot>();
@ -279,18 +279,13 @@ impl BlockListener {
// task to fetch blocks
//
let this = self.clone();
let exit_signal_l = exit_signal.clone();
let slot_indexer_tasks = (0..8).map(move |_| {
let this = this.clone();
let notifier = notifier.clone();
let slot_retry_queue_sx = slot_retry_queue_sx.clone();
let block_schedule_queue_rx = block_schedule_queue_rx.clone();
let exit_signal_l = exit_signal_l.clone();
let task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let task: AnyhowJoinHandle = tokio::spawn(async move {
loop {
if exit_signal_l.load(Ordering::Relaxed) {
break;
}
match block_schedule_queue_rx.recv().await {
Ok(slot) => {
if commitment_config.is_finalized() {
@ -322,7 +317,6 @@ impl BlockListener {
}
}
}
bail!("Block Slot channel closed")
});
task
@ -334,13 +328,8 @@ impl BlockListener {
let slot_retry_task: JoinHandle<anyhow::Result<()>> = {
let block_schedule_queue_sx = block_schedule_queue_sx.clone();
let recent_slot = recent_slot.clone();
let exit_signal_l = exit_signal.clone();
tokio::spawn(async move {
while let Some((slot, instant)) = slot_retry_queue_rx.recv().await {
if exit_signal_l.load(Ordering::Relaxed) {
break;
}
BLOCKS_IN_RETRY_QUEUE.dec();
let recent_slot = recent_slot.load(std::sync::atomic::Ordering::Relaxed);
// if slot is too old ignore
@ -390,9 +379,6 @@ impl BlockListener {
continue;
}
if exit_signal.load(Ordering::Relaxed) {
break;
}
// filter already processed slots
let new_block_slots: Vec<u64> = (last_latest_slot..new_slot).collect();
// context for lock
@ -414,45 +400,45 @@ impl BlockListener {
last_latest_slot = new_slot;
recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed);
}
Ok(())
});
tokio::select! {
res = get_slot_task => {
anyhow::bail!("Get slot task exited unexpectedly {res:?}")
bail!("Get slot task exited unexpectedly {res:?}")
}
res = slot_retry_task => {
anyhow::bail!("Slot retry task exited unexpectedly {res:?}")
bail!("Slot retry task exited unexpectedly {res:?}")
},
res = futures::future::try_join_all(slot_indexer_tasks) => {
anyhow::bail!("Slot indexer exited unexpectedly {res:?}")
bail!("Slot indexer exited unexpectedly {res:?}")
},
}
}
// continuosly poll processed blocks and feed into blockstore
pub fn listen_processed(self, exit_signal: Arc<AtomicBool>) -> JoinHandle<anyhow::Result<()>> {
let block_processor = self.block_processor;
pub fn listen_processed(self) -> AnyhowJoinHandle {
tokio::spawn(async move {
info!("processed block listner started");
loop {
if exit_signal.load(Ordering::Relaxed) {
break;
}
let mut errors = 0;
if let Err(err) = block_processor
while errors <= 10 {
if let Err(err) = self
.block_processor
.poll_latest_block(CommitmentConfig::processed())
.await
{
errors += 1;
error!("Error fetching latest processed block {err:?}");
} else {
errors = 0;
}
// sleep
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Ok(())
bail!("{errors} consecutive errors while polling processed blocks")
})
}

View File

@ -1,12 +1,8 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use crate::block_listenser::BlockListener;
use crate::tx_sender::TxSender;
use crate::{block_listenser::BlockListener, tx_sender::TxSender};
use log::info;
use prometheus::core::GenericGauge;
use prometheus::{opts, register_int_gauge};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_lite_rpc_core::block_store::BlockStore;
use tokio::task::JoinHandle;
@ -49,27 +45,19 @@ impl Cleaner {
BLOCKS_IN_BLOCKSTORE.set(self.block_store.number_of_blocks_in_store() as i64);
}
pub fn start(
self,
ttl_duration: Duration,
exit_signal: Arc<AtomicBool>,
) -> JoinHandle<anyhow::Result<()>> {
pub fn start(self, ttl_duration: Duration) -> JoinHandle<anyhow::Result<()>> {
let mut ttl = tokio::time::interval(ttl_duration);
tokio::spawn(async move {
info!("Cleaning memory");
loop {
if exit_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
ttl.tick().await;
self.clean_tx_sender(ttl_duration);
self.clean_block_listeners(ttl_duration);
self.clean_block_store(ttl_duration).await;
}
Ok(())
})
}
}

View File

@ -2,10 +2,10 @@ use std::time::Duration;
use log::error;
use prometheus::{Encoder, TextEncoder};
use solana_lite_rpc_core::AnyhowJoinHandle;
use tokio::{
io::AsyncWriteExt,
net::{TcpListener, TcpStream, ToSocketAddrs},
task::JoinHandle,
};
pub struct PrometheusSync;
@ -19,7 +19,7 @@ impl PrometheusSync {
)
}
async fn handle_stream(&self, stream: &mut TcpStream) -> anyhow::Result<()> {
async fn handle_stream(stream: &mut TcpStream) -> anyhow::Result<()> {
let mut metrics_buffer = Vec::new();
let encoder = TextEncoder::new();
@ -39,8 +39,7 @@ impl PrometheusSync {
Ok(())
}
pub fn sync(self, addr: impl ToSocketAddrs + Send + 'static) -> JoinHandle<anyhow::Result<()>> {
#[allow(unreachable_code)]
pub fn sync(addr: impl ToSocketAddrs + Send + 'static) -> AnyhowJoinHandle {
tokio::spawn(async move {
let listener = TcpListener::bind(addr).await?;
@ -51,10 +50,8 @@ impl PrometheusSync {
continue;
};
let _ = self.handle_stream(&mut stream).await;
let _ = Self::handle_stream(&mut stream).await;
}
Ok(())
})
}
}

View File

@ -17,7 +17,7 @@ use std::{
net::{IpAddr, Ipv4Addr},
str::FromStr,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
atomic::{AtomicU64, Ordering},
Arc,
},
};
@ -141,7 +141,7 @@ impl TpuService {
let next_leaders = self.leader_schedule.get_leaders(load_slot, last_slot).await;
let connections_to_keep = next_leaders
.iter()
.into_iter()
.filter(|x| x.tpu.is_some())
.map(|x| {
let mut addr = x.tpu.unwrap();
@ -163,48 +163,42 @@ impl TpuService {
.await;
}
fn check_exit_signal(exit_signal: &Arc<AtomicBool>) -> bool {
exit_signal.load(Ordering::Relaxed)
}
async fn update_current_slot(
fn update_current_slot(
&self,
update_notifier: tokio::sync::mpsc::UnboundedSender<u64>,
exit_signal: Arc<AtomicBool>,
) {
) -> AnyhowJoinHandle {
let current_slot = self.current_slot.clone();
let update_slot = |slot: u64| {
let rpc_client = self.rpc_client.clone();
let rpc_ws_address = self.rpc_ws_address.clone();
let update_slot = move |slot: u64| {
if slot > current_slot.load(Ordering::Relaxed) {
current_slot.store(slot, Ordering::Relaxed);
CURRENT_SLOT.set(slot as i64);
let _ = update_notifier.send(slot);
}
};
let mut nb_errror = 0;
loop {
if Self::check_exit_signal(&exit_signal) {
break;
}
// always loop update the current slots as it is central to working of TPU
if let Err(e) =
SolanaUtils::poll_slots(self.rpc_client.clone(), &self.rpc_ws_address, update_slot)
.await
{
tokio::spawn(async move {
let mut nb_errror = 0;
while nb_errror < MAX_NB_ERRORS {
// always loop update the current slots as it is central to working of TPU
let Err(err) = SolanaUtils::poll_slots(&rpc_client, &rpc_ws_address, &update_slot).await else {
nb_errror = 0;
continue;
};
nb_errror += 1;
log::info!("Got error while polling slot {}", e);
if nb_errror > MAX_NB_ERRORS {
error!(
"Reached max amount of errors to fetch latest slot, exiting poll slot loop"
);
break;
}
} else {
nb_errror = 0;
log::info!("Got error while polling slot {}", err);
}
}
bail!("Reached max amount of errors to fetch latest slot, exiting poll slot loop")
})
}
pub async fn start(&self, exit_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
pub async fn start(&self) -> anyhow::Result<()> {
// setup
self.leader_schedule
.load_cluster_info(self.rpc_client.clone())
.await?;
@ -213,25 +207,20 @@ impl TpuService {
self.update_quic_connections().await;
let this = self.clone();
let exit_signal_l = exit_signal.clone();
let jh_update_leaders = tokio::spawn(async move {
let update_leader_schedule_service = tokio::spawn(async move {
let mut last_cluster_info_update = Instant::now();
let leader_schedule_update_interval =
Duration::from_secs(LEADER_SCHEDULE_UPDATE_INTERVAL);
let cluster_info_update_interval = Duration::from_secs(CLUSTERINFO_REFRESH_TIME);
loop {
if Self::check_exit_signal(&exit_signal_l) {
break;
}
tokio::time::sleep(leader_schedule_update_interval).await;
if Self::check_exit_signal(&exit_signal_l) {
break;
}
loop {
tokio::time::sleep(leader_schedule_update_interval).await;
info!("update leader schedule and cluster nodes");
if this.update_leader_schedule().await.is_err() {
error!("unable to update leader shedule");
}
if last_cluster_info_update.elapsed() > cluster_info_update_interval {
if this.update_current_stakes().await.is_err() {
error!("unable to update cluster infos");
@ -242,47 +231,38 @@ impl TpuService {
}
});
let this = self.clone();
let (slot_sender, slot_reciever) = tokio::sync::mpsc::unbounded_channel::<Slot>();
let exit_signal_l = exit_signal.clone();
let slot_sub_task: AnyhowJoinHandle = tokio::spawn(async move {
this.update_current_slot(slot_sender, exit_signal_l).await;
Ok(())
});
let estimated_slot = self.estimated_slot.clone();
let current_slot = self.current_slot.clone();
// Service to poll current slot from upstream rpc
let slot_poll_service = self.update_current_slot(slot_sender);
// Service to estimate slots
let this = self.clone();
let exit_signal_l = exit_signal.clone();
let estimated_slot_calculation = tokio::spawn(async move {
let estimated_slot_service = tokio::spawn(async move {
let mut slot_update_notifier = slot_reciever;
loop {
if Self::check_exit_signal(&exit_signal_l) {
break;
}
if SolanaUtils::slot_estimator(
&mut slot_update_notifier,
current_slot.clone(),
estimated_slot.clone(),
this.current_slot.clone(),
this.estimated_slot.clone(),
)
.await
{
ESTIMATED_SLOT.set(estimated_slot.load(Ordering::Relaxed) as i64);
ESTIMATED_SLOT.set(this.estimated_slot.load(Ordering::Relaxed) as i64);
this.update_quic_connections().await;
}
}
});
tokio::select! {
res = jh_update_leaders => {
bail!("Leader update service exited unexpectedly {res:?}");
res = update_leader_schedule_service => {
bail!("Leader update Service {res:?}");
},
res = slot_sub_task => {
bail!("Leader update service exited unexpectedly {res:?}");
res = slot_poll_service => {
bail!("Slot Poll Service {res:?}");
},
res = estimated_slot_calculation => {
bail!("Estimated slot calculation service exited unexpectedly {res:?}");
res = estimated_slot_service => {
bail!("Estimated slot Service {res:?}");
},
}
}

View File

@ -1,14 +1,11 @@
use crate::tpu_utils::tpu_service::TpuService;
use anyhow::bail;
use log::error;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_lite_rpc_core::tx_store::TxStore;
use std::{
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
use solana_lite_rpc_core::{tx_store::TxStore, AnyhowJoinHandle};
use std::time::Duration;
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender},
task::JoinHandle,
time::Instant,
};
@ -45,56 +42,44 @@ impl TransactionReplayer {
pub fn start_service(
&self,
sender: UnboundedSender<TransactionReplay>,
reciever: UnboundedReceiver<TransactionReplay>,
exit_signal: Arc<AtomicBool>,
) -> JoinHandle<anyhow::Result<()>> {
mut reciever: UnboundedReceiver<TransactionReplay>,
) -> AnyhowJoinHandle {
let tpu_service = self.tpu_service.clone();
let tx_store = self.tx_store.clone();
let retry_after = self.retry_after;
tokio::spawn(async move {
let mut reciever = reciever;
loop {
if exit_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let tx = reciever.recv().await;
match tx {
Some(mut tx_replay) => {
MESSAGES_IN_REPLAY_QUEUE.dec();
if Instant::now() < tx_replay.replay_at {
tokio::time::sleep_until(tx_replay.replay_at).await;
}
if let Some(tx) = tx_store.get(&tx_replay.signature) {
if tx.status.is_some() {
// transaction has been confirmed / no retry needed
continue;
}
} else {
// transaction timed out
continue;
}
// ignore reset error
let _ = tpu_service
.send_transaction(tx_replay.signature.clone(), tx_replay.tx.clone());
if tx_replay.replay_count < tx_replay.max_replay {
tx_replay.replay_count += 1;
tx_replay.replay_at = Instant::now() + retry_after;
if let Err(e) = sender.send(tx_replay) {
error!("error while scheduling replay ({})", e);
continue;
} else {
MESSAGES_IN_REPLAY_QUEUE.inc();
}
}
tokio::spawn(async move {
while let Some(mut tx_replay) = reciever.recv().await {
MESSAGES_IN_REPLAY_QUEUE.dec();
if Instant::now() < tx_replay.replay_at {
tokio::time::sleep_until(tx_replay.replay_at).await;
}
if let Some(tx) = tx_store.get(&tx_replay.signature) {
if tx.status.is_some() {
// transaction has been confirmed / no retry needed
continue;
}
None => {
error!("transaction replay channel broken");
break;
} else {
// transaction timed out
continue;
}
// ignore reset error
let _ =
tpu_service.send_transaction(tx_replay.signature.clone(), tx_replay.tx.clone());
if tx_replay.replay_count < tx_replay.max_replay {
tx_replay.replay_count += 1;
tx_replay.replay_at = Instant::now() + retry_after;
if let Err(e) = sender.send(tx_replay) {
error!("error while scheduling replay ({})", e);
continue;
} else {
MESSAGES_IN_REPLAY_QUEUE.inc();
}
}
}
Ok(())
bail!("transaction replay channel broken");
})
}
}

View File

@ -1,13 +1,7 @@
// This class will manage the lifecycle for a transaction
// It will send, replay if necessary and confirm by listening to blocks
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use std::time::Duration;
use crate::{
block_listenser::BlockListener,
@ -20,12 +14,12 @@ use anyhow::bail;
use solana_lite_rpc_core::{
block_store::{BlockInformation, BlockStore},
notifications::NotificationSender,
AnyhowJoinHandle,
};
use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_sdk::{commitment_config::CommitmentConfig, transaction::VersionedTransaction};
use tokio::{
sync::mpsc::{self, Sender, UnboundedSender},
task::JoinHandle,
time::Instant,
};
@ -55,90 +49,79 @@ impl TransactionServiceBuilder {
}
}
pub async fn start(
&self,
pub fn start(
self,
notifier: Option<NotificationSender>,
block_store: BlockStore,
max_retries: usize,
clean_interval: Duration,
) -> (TransactionService, JoinHandle<String>) {
) -> (TransactionService, AnyhowJoinHandle) {
let (transaction_channel, tx_recv) = mpsc::channel(self.max_nb_txs_in_queue);
let (replay_channel, replay_reciever) = tokio::sync::mpsc::unbounded_channel();
let tx_sender = self.tx_sender.clone();
let block_listner = self.block_listner.clone();
let tx_replayer = self.tx_replayer.clone();
let tpu_service = self.tpu_service.clone();
let replay_channel_task = replay_channel.clone();
let exit_signal = Arc::new(AtomicBool::new(false));
let exit_signal_t = exit_signal.clone();
let block_store_t = block_store.clone();
let jh_services: JoinHandle<String> = tokio::spawn(async move {
let tpu_service_fx = tpu_service.start(exit_signal_t.clone());
let jh_services: AnyhowJoinHandle = {
let tx_sender = self.tx_sender.clone();
let block_listner = self.block_listner.clone();
let tx_replayer = self.tx_replayer.clone();
let tpu_service = self.tpu_service.clone();
let replay_channel_task = replay_channel.clone();
let block_store_t = block_store.clone();
let tx_sender_jh =
tx_sender
.clone()
.execute(tx_recv, notifier.clone(), exit_signal_t.clone());
tokio::spawn(async move {
let tpu_service_fx = tpu_service.start();
let replay_service = tx_replayer.start_service(
replay_channel_task,
replay_reciever,
exit_signal_t.clone(),
);
let tx_sender_jh = tx_sender.clone().execute(tx_recv, notifier.clone());
let finalized_block_listener = block_listner.clone().listen(
CommitmentConfig::finalized(),
notifier.clone(),
tpu_service.get_estimated_slot_holder(),
exit_signal_t.clone(),
);
let replay_service =
tx_replayer.start_service(replay_channel_task, replay_reciever);
let confirmed_block_listener = block_listner.clone().listen(
CommitmentConfig::confirmed(),
None,
tpu_service.get_estimated_slot_holder(),
exit_signal_t.clone(),
);
let finalized_block_listener = block_listner.clone().listen(
CommitmentConfig::finalized(),
notifier.clone(),
tpu_service.get_estimated_slot_holder(),
);
let processed_block_listener = block_listner
.clone()
.listen_processed(exit_signal_t.clone());
let confirmed_block_listener = block_listner.clone().listen(
CommitmentConfig::confirmed(),
None,
tpu_service.get_estimated_slot_holder(),
);
let cleaner = Cleaner::new(tx_sender.clone(), block_listner.clone(), block_store_t)
.start(clean_interval, exit_signal_t);
let processed_block_listener = block_listner.clone().listen_processed();
tokio::select! {
res = tpu_service_fx => {
format!("{res:?}")
},
res = tx_sender_jh => {
format!("{res:?}")
},
let cleaner = Cleaner::new(tx_sender.clone(), block_listner.clone(), block_store_t)
.start(clean_interval);
res = finalized_block_listener => {
format!("{res:?}")
},
res = confirmed_block_listener => {
format!("{res:?}")
},
res = processed_block_listener => {
format!("{res:?}")
},
res = replay_service => {
format!("{res:?}")
},
res = cleaner => {
format!("{res:?}")
},
}
});
tokio::select! {
res = tpu_service_fx => {
bail!("Tpu Service {res:?}")
},
res = tx_sender_jh => {
bail!("Tx Sender {res:?}")
},
res = finalized_block_listener => {
bail!("Finalized Block Listener {res:?}")
},
res = confirmed_block_listener => {
bail!("Confirmed Block Listener {res:?}")
},
res = processed_block_listener => {
bail!("Processed Block Listener {res:?}")
},
res = replay_service => {
bail!("Replay Service {res:?}")
},
res = cleaner => {
bail!("Cleaner {res:?}")
},
}
})
};
(
TransactionService {
transaction_channel,
replay_channel,
exit_signal,
block_store,
max_retries,
replay_after: self.tx_replayer.retry_after,
@ -152,7 +135,6 @@ impl TransactionServiceBuilder {
pub struct TransactionService {
pub transaction_channel: Sender<(String, WireTransaction, u64)>,
pub replay_channel: UnboundedSender<TransactionReplay>,
pub exit_signal: Arc<AtomicBool>,
pub block_store: BlockStore,
pub max_retries: usize,
pub replay_after: Duration,
@ -208,8 +190,4 @@ impl TransactionService {
}
Ok(signature.to_string())
}
pub fn stop(&self) {
self.exit_signal.store(true, Ordering::Relaxed)
}
}

View File

@ -1,7 +1,4 @@
use std::{
sync::{atomic::AtomicBool, Arc},
time::{Duration, Instant},
};
use std::time::{Duration, Instant};
use anyhow::bail;
use chrono::Utc;
@ -132,17 +129,12 @@ impl TxSender {
self,
mut recv: Receiver<(String, WireTransaction, u64)>,
notifier: Option<NotificationSender>,
exit_signal: Arc<AtomicBool>,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
let tx_sender = self.clone();
loop {
let mut sigs_and_slots = Vec::with_capacity(MAX_BATCH_SIZE_IN_PER_INTERVAL);
let mut txs = Vec::with_capacity(MAX_BATCH_SIZE_IN_PER_INTERVAL);
let mut timeout_interval = INTERVAL_PER_BATCH_IN_MS;
if exit_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
// In solana there in sig verify stage rate is limited to 2000 txs in 50ms
// taking this as reference
@ -154,8 +146,9 @@ impl TxSender {
Ok(value) => match value {
Some((sig, tx, slot)) => {
TXS_IN_CHANNEL.dec();
// duplicate transaction
if self.txs_sent_store.contains_key(&sig) {
// duplicate transaction
continue;
}
sigs_and_slots.push((sig, slot));
@ -182,11 +175,10 @@ impl TxSender {
}
TX_BATCH_SIZES.set(txs.len() as i64);
tx_sender
.forward_txs(sigs_and_slots, txs, notifier.clone())
self.forward_txs(sigs_and_slots, txs, notifier.clone())
.await;
}
Ok(())
})
}