From 2bc65f9ec6a02db57b6dc50b95f7014137c988aa Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Mon, 12 Jun 2023 05:45:40 +0530 Subject: [PATCH] block_processor and bridge --- lite-rpc/src/bridge.rs | 87 ++++++++++++++++++----------- lite-rpc/src/lib.rs | 2 + lite-rpc/src/main.rs | 31 +++++------ services/src/block_listenser.rs | 99 +++++++++++++++++++++------------ 4 files changed, 132 insertions(+), 87 deletions(-) diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index c5c3a010..a171160f 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -3,7 +3,7 @@ use crate::{ encoding::BinaryEncoding, postgres::Postgres, rpc::LiteRpcServer, - DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, + AnyhowJoinHandle, DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, }; use solana_lite_rpc_services::{ @@ -36,13 +36,12 @@ use solana_rpc_client_api::{ }; use solana_sdk::{ commitment_config::CommitmentConfig, hash::Hash, pubkey::Pubkey, signature::Keypair, - transaction::VersionedTransaction, slot_history::Slot, + slot_history::Slot, transaction::VersionedTransaction, }; use solana_transaction_status::TransactionStatus; use tokio::{ net::ToSocketAddrs, sync::mpsc::{self, Sender, UnboundedSender}, - task::JoinHandle, time::Instant, }; @@ -134,7 +133,7 @@ impl LiteBridge { clean_interval: Duration, enable_postgres: bool, prometheus_addr: T, - ) -> anyhow::Result>>> { + ) -> anyhow::Result<()> { let (postgres, postgres_send) = if enable_postgres { let (postgres_send, postgres_recv) = mpsc::unbounded_channel(); let postgres = Postgres::new().await?; @@ -145,7 +144,8 @@ impl LiteBridge { (None, None) }; - let mut tpu_services = self.tpu_service.start().await?; + let tpu_service = self.tpu_service.clone(); + let tpu_service = tpu_service.start(); let (tx_send, tx_recv) = mpsc::channel(DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE); self.tx_send_channel = Some(tx_send); @@ -200,13 +200,13 @@ impl LiteBridge { .await? .start(rpc)?; - let ws_server = tokio::spawn(async move { + let ws_server: AnyhowJoinHandle = tokio::spawn(async move { info!("Websocket Server started at {ws_addr:?}"); ws_server_handle.stopped().await; bail!("Websocket server stopped"); }); - let http_server = tokio::spawn(async move { + let http_server: AnyhowJoinHandle = tokio::spawn(async move { info!("HTTP Server started at {http_addr:?}"); http_server_handle.stopped().await; bail!("HTTP server stopped"); @@ -215,26 +215,53 @@ impl LiteBridge { (ws_server, http_server) }; - let mut services = vec![ - ws_server, - http_server, - tx_sender, - finalized_block_listener, - confirmed_block_listener, - processed_block_listener, - metrics_capture, - prometheus_sync, - cleaner, - replay_service, - ]; + let postgres = tokio::spawn(async { + let Some(postgres) = postgres else { + std::future::pending::<()>().await; + unreachable!(); + }; - services.append(&mut tpu_services); + postgres.await + }); - if let Some(postgres) = postgres { - services.push(postgres); + tokio::select! { + res = tpu_service => { + bail!("Tpu Services exited unexpectedly {res:?}"); + }, + res = ws_server => { + bail!("WebSocket server exited unexpectedly {res:?}"); + }, + res = http_server => { + bail!("HTTP server exited unexpectedly {res:?}"); + }, + res = tx_sender => { + bail!("Tx Sender exited unexpectedly {res:?}"); + }, + res = finalized_block_listener => { + bail!("Finalized Block Listener exited unexpectedly {res:?}"); + }, + res = confirmed_block_listener => { + bail!("Confirmed Block Listener exited unexpectedly {res:?}"); + }, + res = processed_block_listener => { + bail!("Processed Block Listener exited unexpectedly {res:?}"); + }, + res = metrics_capture => { + bail!("Metrics Capture exited unexpectedly {res:?}"); + }, + res = prometheus_sync => { + bail!("Prometheus Service exited unexpectedly {res:?}"); + }, + res = cleaner => { + bail!("Cleaner Service exited unexpectedly {res:?}"); + }, + res = replay_service => { + bail!("Tx replay service exited unexpectedly {res:?}"); + }, + res = postgres => { + bail!("Postgres service exited unexpectedly {res:?}"); + }, } - - Ok(services) } } @@ -458,19 +485,13 @@ impl LiteRpcServer for LiteBridge { Ok(airdrop_sig) } - async fn get_slot( - &self, - config: Option, - ) -> crate::rpc::Result { + async fn get_slot(&self, config: Option) -> crate::rpc::Result { let commitment_config = config .map(|config| config.commitment.unwrap_or_default()) .unwrap_or_default(); - let (_, - BlockInformation { - slot, .. - } - ) = self.block_store.get_latest_block(commitment_config).await; + let (_, BlockInformation { slot, .. }) = + self.block_store.get_latest_block(commitment_config).await; Ok(slot) } diff --git a/lite-rpc/src/lib.rs b/lite-rpc/src/lib.rs index ad84c259..7e2d232c 100644 --- a/lite-rpc/src/lib.rs +++ b/lite-rpc/src/lib.rs @@ -9,6 +9,8 @@ pub mod errors; pub mod postgres; pub mod rpc; +pub type AnyhowJoinHandle = tokio::task::JoinHandle>; + #[from_env] pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899"; #[from_env] diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 59e7f679..3470ec6b 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -63,7 +63,8 @@ pub async fn main() -> anyhow::Result<()> { }; let retry_after = Duration::from_secs(transaction_retry_after_secs); - let light_bridge = LiteBridge::new( + + let services = LiteBridge::new( rpc_addr, ws_addr, fanout_size, @@ -71,29 +72,25 @@ pub async fn main() -> anyhow::Result<()> { retry_after, maximum_retries_per_tx, ) - .await?; - - let services = light_bridge - .start_services( - lite_rpc_http_addr, - lite_rpc_ws_addr, - clean_interval_ms, - enable_postgres, - prometheus_addr, - ) - .await?; - - let services = futures::future::try_join_all(services); + .await? + .start_services( + lite_rpc_http_addr, + lite_rpc_ws_addr, + clean_interval_ms, + enable_postgres, + prometheus_addr, + ); let ctrl_c_signal = tokio::signal::ctrl_c(); tokio::select! { - _ = services => { - bail!("Services quit unexpectedly"); + res = services => { + bail!("Services quit unexpectedly {res:?}"); } _ = ctrl_c_signal => { info!("Received ctrl+c signal"); - Ok(()) } } + + Ok(()) } diff --git a/services/src/block_listenser.rs b/services/src/block_listenser.rs index 11cd2107..bdd59bbd 100644 --- a/services/src/block_listenser.rs +++ b/services/src/block_listenser.rs @@ -1,3 +1,4 @@ + use std::{ sync::{ atomic::{AtomicU64, Ordering}, @@ -6,9 +7,10 @@ use std::{ time::Duration, }; -use anyhow::bail; +use anyhow::{bail, Context}; use chrono::{TimeZone, Utc}; -use jsonrpsee::SubscriptionSink; + +use jsonrpsee::{SubscriptionSink}; use log::{error, info, trace}; use prometheus::{ core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter, @@ -17,6 +19,7 @@ use prometheus::{ use solana_rpc_client::nonblocking::rpc_client::RpcClient; + use solana_sdk::{ commitment_config::{CommitmentConfig, CommitmentLevel}, slot_history::Slot, @@ -250,32 +253,26 @@ impl BlockListener { Ok(()) } - pub fn listen( + pub async fn listen( self, commitment_config: CommitmentConfig, notifier: Option, estimated_slot: Arc, - ) -> JoinHandle> { + ) -> anyhow::Result<()> { let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel(); let (block_schedule_queue_sx, block_schedule_queue_rx) = async_channel::unbounded::(); // task to fetch blocks - for _i in 0..8 { - let this = self.clone(); + // + let this = self.clone(); + let slot_indexer_tasks = (0..8).map(move |_| { + let this = this.clone(); let notifier = notifier.clone(); let slot_retry_queue_sx = slot_retry_queue_sx.clone(); let block_schedule_queue_rx = block_schedule_queue_rx.clone(); - tokio::spawn(async move { - loop { - let slot = match block_schedule_queue_rx.recv().await { - Ok(v) => v, - Err(e) => { - error!("Recv error on block channel {}", e); - break; - } - }; - + let task: JoinHandle> = tokio::spawn(async move { + while let Ok(slot) = block_schedule_queue_rx.recv().await { if commitment_config.is_finalized() { BLOCKS_IN_FINALIZED_QUEUE.dec(); } else { @@ -291,19 +288,28 @@ impl BlockListener { let retry_at = tokio::time::Instant::now() .checked_add(Duration::from_millis(10)) .unwrap(); - let _ = slot_retry_queue_sx.send((slot, retry_at)); + + slot_retry_queue_sx + .send((slot, retry_at)) + .context("Error sending slot to retry queue from slot indexer task")?; + BLOCKS_IN_RETRY_QUEUE.inc(); }; } + + bail!("Block Slot channel closed") }); - } + + task + }); // a task that will queue back the slots to be retried after a certain delay let recent_slot = Arc::new(AtomicU64::new(0)); - { + let slot_retry_task: JoinHandle> = { let block_schedule_queue_sx = block_schedule_queue_sx.clone(); let recent_slot = recent_slot.clone(); + tokio::spawn(async move { while let Some((slot, instant)) = slot_retry_queue_rx.recv().await { BLOCKS_IN_RETRY_QUEUE.dec(); @@ -315,22 +321,26 @@ impl BlockListener { continue; } - let now = tokio::time::Instant::now(); - if now < instant { + if tokio::time::Instant::now() < instant { tokio::time::sleep_until(instant).await; } - if block_schedule_queue_sx.send(slot).await.is_ok() { - if commitment_config.is_finalized() { - BLOCKS_IN_FINALIZED_QUEUE.inc(); - } else { - BLOCKS_IN_CONFIRMED_QUEUE.inc(); - } + + block_schedule_queue_sx.send(slot).await.context( + "Slot retry que failed to send block to block_schedule_queue_sx", + )?; + + if commitment_config.is_finalized() { + BLOCKS_IN_FINALIZED_QUEUE.inc(); + } else { + BLOCKS_IN_CONFIRMED_QUEUE.inc(); } } - }); - } - tokio::spawn(async move { + bail!("Slot retry task exit") + }) + }; + + let get_slot_task: JoinHandle> = tokio::spawn(async move { info!("{commitment_config:?} block listner started"); let last_latest_slot = self @@ -355,9 +365,9 @@ impl BlockListener { // context for lock { for slot in new_block_slots { - if let Err(e) = block_schedule_queue_sx.send(slot).await { - error!("error sending of block schedule queue {}", e); - } else if commitment_config.is_finalized() { + block_schedule_queue_sx.send(slot).await?; + + if commitment_config.is_finalized() { BLOCKS_IN_FINALIZED_QUEUE.inc(); } else { BLOCKS_IN_CONFIRMED_QUEUE.inc(); @@ -368,7 +378,19 @@ impl BlockListener { last_latest_slot = new_slot; recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed); } - }) + }); + + tokio::select! { + res = get_slot_task => { + anyhow::bail!("Get slot task exited unexpectedly {res:?}") + } + res = slot_retry_task => { + anyhow::bail!("Slot retry task exited unexpectedly {res:?}") + }, + res = futures::future::try_join_all(slot_indexer_tasks) => { + anyhow::bail!("Slot indexer exited unexpectedly {res:?}") + }, + } } // continuosly poll processed blocks and feed into blockstore @@ -379,10 +401,13 @@ impl BlockListener { info!("processed block listner started"); loop { - // ignore errors from processed block polling - let _ = block_processor + if let Err(err) = block_processor .poll_latest_block(CommitmentConfig::processed()) - .await; + .await { + error!("Error fetching latest processed block {err:?}"); + } + + // sleep tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; } })