diff --git a/src/block_store.rs b/src/block_store.rs index 65246217..323a63a4 100644 --- a/src/block_store.rs +++ b/src/block_store.rs @@ -1,21 +1,42 @@ -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use dashmap::DashMap; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::commitment_config::CommitmentConfig; +use tokio::sync::RwLock; use crate::workers::BlockInformation; +#[derive(Clone)] pub struct BlockStore { blocks: Arc>, - latest_block_hash: Arc>, + latest_confirmed_blockhash: Arc>, + latest_finalized_blockhash: Arc>, } impl BlockStore { - pub async fn new( + pub async fn new(rpc_client: &RpcClient) -> anyhow::Result { + let (confirmed_blockhash, confirmed_block) = + Self::fetch_latest(rpc_client, CommitmentConfig::confirmed()).await?; + let (finalized_blockhash, finalized_block) = + Self::fetch_latest(rpc_client, CommitmentConfig::finalized()).await?; + + Ok(Self { + latest_confirmed_blockhash: Arc::new(RwLock::new(confirmed_blockhash.clone())), + latest_finalized_blockhash: Arc::new(RwLock::new(finalized_blockhash.clone())), + blocks: Arc::new({ + let map = DashMap::new(); + map.insert(confirmed_blockhash, confirmed_block); + map.insert(finalized_blockhash, finalized_block); + map + }), + }) + } + + pub async fn fetch_latest( rpc_client: &RpcClient, commitment_config: CommitmentConfig, - ) -> anyhow::Result { + ) -> anyhow::Result<(String, BlockInformation)> { let (latest_block_hash, block_height) = rpc_client .get_latest_blockhash_with_commitment(commitment_config) .await?; @@ -25,13 +46,55 @@ impl BlockStore { .get_slot_with_commitment(commitment_config) .await?; - Ok(Self { - latest_block_hash: Arc::new(RwLock::new(latest_block_hash.clone())), - blocks: Arc::new({ - let map = DashMap::new(); - map.insert(latest_block_hash, BlockInformation { slot, block_height }); - map - }), - }) + Ok((latest_block_hash, BlockInformation { slot, block_height })) + } + + pub async fn get_block_info(&self, blockhash: &str) -> Option { + let Some(info) = self.blocks.get(blockhash) else { + return None; + }; + + Some(info.value().to_owned()) + } + + pub fn get_latest_blockhash(&self, commitment_config: CommitmentConfig) -> Arc> { + if commitment_config.is_finalized() { + self.latest_confirmed_blockhash.clone() + } else { + self.latest_finalized_blockhash.clone() + } + } + + pub async fn get_latest_block_info( + &self, + commitment_config: CommitmentConfig, + ) -> (String, BlockInformation) { + let blockhash = self + .get_latest_blockhash(commitment_config) + .read() + .await + .to_owned(); + + let block_info = self + .blocks + .get(&blockhash) + .expect("Race Condition: Latest block not in block store") + .value() + .to_owned(); + + (blockhash, block_info) + } + + pub async fn add_block( + &self, + blockhash: String, + block_info: BlockInformation, + commitment_config: CommitmentConfig, + ) { + // Write to block store first in order to prevent + // any race condition i.e prevent some one to + // ask the map what it doesn't have rn + self.blocks.insert(blockhash.clone(), block_info); + *self.get_latest_blockhash(commitment_config).write().await = blockhash; } } diff --git a/src/bridge.rs b/src/bridge.rs index 77e774f2..98bf0e8d 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -1,4 +1,5 @@ use crate::{ + block_store::BlockStore, configs::{IsBlockHashValidConfig, SendTransactionConfig}, encoding::BinaryEncoding, rpc::LiteRpcServer, @@ -23,9 +24,7 @@ use solana_rpc_client_api::{ response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo}, }; use solana_sdk::{ - commitment_config::{CommitmentConfig, CommitmentLevel}, - hash::Hash, - pubkey::Pubkey, + commitment_config::CommitmentConfig, hash::Hash, pubkey::Pubkey, transaction::VersionedTransaction, }; use solana_transaction_status::TransactionStatus; @@ -42,14 +41,13 @@ pub struct LiteBridge { // None if LiteBridge is not executed pub tx_send: Option>, pub tx_sender: TxSender, - pub finalized_block_listener: BlockListener, - pub confirmed_block_listener: BlockListener, + pub block_listner: BlockListener, + pub block_store: BlockStore, } impl LiteBridge { pub async fn new(rpc_url: String, ws_addr: String, fanout_slots: u64) -> anyhow::Result { let rpc_client = Arc::new(RpcClient::new(rpc_url.clone())); - let pub_sub_client = Arc::new(PubsubClient::new(&ws_addr).await?); let tpu_manager = @@ -57,40 +55,25 @@ impl LiteBridge { let tx_sender = TxSender::new(tpu_manager.clone()); - let finalized_block_listener = BlockListener::new( + let block_store = BlockStore::new(&rpc_client).await?; + + let block_listner = BlockListener::new( pub_sub_client.clone(), rpc_client.clone(), tx_sender.clone(), - CommitmentConfig::finalized(), - ) - .await?; - - let confirmed_block_listener = BlockListener::new( - pub_sub_client, - rpc_client.clone(), - tx_sender.clone(), - CommitmentConfig::confirmed(), - ) - .await?; + block_store.clone(), + ); Ok(Self { rpc_client, tpu_manager, tx_send: None, tx_sender, - finalized_block_listener, - confirmed_block_listener, + block_listner, + block_store, }) } - pub fn get_block_listner(&self, commitment_config: CommitmentConfig) -> BlockListener { - if let CommitmentLevel::Finalized = commitment_config.commitment { - self.finalized_block_listener.clone() - } else { - self.confirmed_block_listener.clone() - } - } - /// List for `JsonRpc` requests pub async fn start_services( mut self, @@ -127,19 +110,17 @@ impl LiteBridge { let metrics_capture = metrics_capture.capture(); let finalized_block_listener = self - .finalized_block_listener + .block_listner .clone() - .listen(postgres_send.clone()); + .listen(CommitmentConfig::finalized(), postgres_send.clone()); - let confirmed_block_listener = self.confirmed_block_listener.clone().listen(None); - let cleaner = Cleaner::new( - self.tx_sender.clone(), - [ - self.finalized_block_listener.clone(), - self.confirmed_block_listener.clone(), - ], - ) - .start(clean_interval); + let confirmed_block_listener = self + .block_listner + .clone() + .listen(CommitmentConfig::confirmed(), None); + + let cleaner = + Cleaner::new(self.tx_sender.clone(), self.block_listner.clone()).start(clean_interval); let rpc = self.into_rpc(); @@ -219,11 +200,11 @@ impl LiteRpcServer for LiteBridge { let sig = tx.get_signature(); let Some(BlockInformation { slot, .. }) = self - .confirmed_block_listener + .block_store .get_block_info(&tx.get_recent_blockhash().to_string()) .await else { log::warn!("block"); - return Err(jsonrpsee::core::Error::Custom("Blockhash not found in confirmed block store".to_string())); + return Err(jsonrpsee::core::Error::Custom("Blockhash not found in block store".to_string())); }; self.tx_send @@ -245,9 +226,10 @@ impl LiteRpcServer for LiteBridge { CommitmentConfig::default() }; - let block_listner = self.get_block_listner(commitment_config); - let (blockhash, BlockInformation { slot, block_height }) = - block_listner.get_latest_block_info().await; + let (blockhash, BlockInformation { slot, block_height }) = self + .block_store + .get_latest_block_info(commitment_config) + .await; Ok(RpcResponse { context: RpcResponseContext { @@ -276,8 +258,6 @@ impl LiteRpcServer for LiteBridge { } }; - let block_listner = self.get_block_listner(commitment); - let is_valid = match self .rpc_client .is_blockhash_valid(&blockhash, commitment) @@ -289,7 +269,12 @@ impl LiteRpcServer for LiteBridge { } }; - let slot = block_listner.get_latest_block_info().await.1.slot; + let slot = self + .block_store + .get_latest_block_info(commitment) + .await + .1 + .slot; Ok(RpcResponse { context: RpcResponseContext { @@ -318,8 +303,8 @@ impl LiteRpcServer for LiteBridge { Ok(RpcResponse { context: RpcResponseContext { slot: self - .finalized_block_listener - .get_latest_block_info() + .block_store + .get_latest_block_info(CommitmentConfig::finalized()) .await .1 .slot, @@ -372,11 +357,10 @@ impl LiteRpcServer for LiteBridge { &self, mut sink: SubscriptionSink, signature: String, - commitment_config: CommitmentConfig, + _commitment_config: CommitmentConfig, ) -> SubscriptionResult { sink.accept()?; - self.get_block_listner(commitment_config) - .signature_subscribe(signature, sink); + self.block_listner.signature_subscribe(signature, sink); Ok(()) } } diff --git a/src/cli.rs b/src/cli.rs index ac8bcd42..ab4e03c2 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -29,5 +29,5 @@ pub struct Args { pub clean_interval_ms: u64, /// addr to postgres #[arg(short = 'p', long)] - pub enable_postgres: bool + pub enable_postgres: bool, } diff --git a/src/lib.rs b/src/lib.rs index 041530c7..7916cd4e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ use const_env::from_env; use solana_transaction_status::TransactionConfirmationStatus; +pub mod block_store; pub mod bridge; pub mod cli; pub mod configs; @@ -9,7 +10,6 @@ pub mod errors; pub mod rpc; pub mod tpu_manager; pub mod workers; -pub mod block_store; #[from_env] pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899"; diff --git a/src/tpu_manager.rs b/src/tpu_manager.rs index 0eafe1c0..a4fe623b 100644 --- a/src/tpu_manager.rs +++ b/src/tpu_manager.rs @@ -16,7 +16,7 @@ pub struct TpuManager { error_count: Arc, rpc_client: Arc, tpu_client: Arc>, - ws_addr: String, + pub ws_addr: String, fanout_slots: u64, } diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index 280b6a56..b60a5183 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -19,11 +19,14 @@ use solana_transaction_status::{ TransactionStatus, UiConfirmedBlock, UiTransactionStatusMeta, }; use tokio::{ - sync::{mpsc::Sender, RwLock}, + sync::{mpsc::Sender}, task::JoinHandle, }; -use crate::workers::{PostgresBlock, PostgresMsg, PostgresUpdateTx}; +use crate::{ + block_store::{BlockStore}, + workers::{PostgresBlock, PostgresMsg, PostgresUpdateTx}, +}; use super::{PostgresMpscSend, TxProps, TxSender}; @@ -32,10 +35,8 @@ use super::{PostgresMpscSend, TxProps, TxSender}; #[derive(Clone)] pub struct BlockListener { pub_sub_client: Arc, - commitment_config: CommitmentConfig, tx_sender: TxSender, - block_store: Arc>, - latest_block_hash: Arc>, + block_store: BlockStore, pub signature_subscribers: Arc>, } @@ -51,25 +52,18 @@ pub struct BlockListnerNotificatons { } impl BlockListener { - pub async fn new( + pub fn new( pub_sub_client: Arc, - rpc_client: Arc, + _rpc_client: Arc, tx_sender: TxSender, - commitment_config: CommitmentConfig, - ) -> anyhow::Result { - - Ok(Self { + block_store: BlockStore, + ) -> Self { + Self { pub_sub_client, tx_sender, - latest_block_hash: Arc::new(RwLock::new(latest_block_hash.clone())), - block_store: Arc::new({ - let map = DashMap::new(); - map.insert(latest_block_hash, BlockInformation { slot, block_height }); - map - }), - commitment_config, + block_store, signature_subscribers: Default::default(), - }) + } } pub async fn num_of_sigs_commited(&self, sigs: &[String]) -> usize { @@ -82,31 +76,6 @@ impl BlockListener { num_of_sigs_commited } - pub async fn get_latest_block_info(&self) -> (String, BlockInformation) { - let blockhash = &*self.latest_block_hash.read().await; - - ( - blockhash.to_owned(), - self.block_store - .get(blockhash) - .expect("Race Condition: Latest block not in block store") - .value() - .to_owned(), - ) - } - - pub async fn get_block_info(&self, blockhash: &str) -> Option { - let Some(info) = self.block_store.get(blockhash) else { - return None; - }; - - Some(info.value().to_owned()) - } - - pub async fn get_latest_blockhash(&self) -> String { - self.latest_block_hash.read().await.to_owned() - } - pub fn signature_subscribe(&self, signature: String, sink: SubscriptionSink) { let _ = self.signature_subscribers.insert(signature, sink); } @@ -115,9 +84,13 @@ impl BlockListener { self.signature_subscribers.remove(&signature); } - pub fn listen(self, postgres: Option) -> JoinHandle> { + pub fn listen( + self, + commitment_config: CommitmentConfig, + postgres: Option, + ) -> JoinHandle> { tokio::spawn(async move { - let commitment = self.commitment_config.commitment; + let commitment = commitment_config.commitment; let comfirmation_status = match commitment { CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized, @@ -131,7 +104,7 @@ impl BlockListener { .block_subscribe( RpcBlockSubscribeFilter::All, Some(RpcBlockSubscribeConfig { - commitment: Some(self.commitment_config), + commitment: Some(commitment_config), encoding: None, transaction_details: Some( solana_transaction_status::TransactionDetails::Full, @@ -164,12 +137,13 @@ impl BlockListener { let parent_slot = block.parent_slot; - // Write to block store first in order to prevent - // any race condition i.e prevent some one to - // ask the map what it doesn't have rn self.block_store - .insert(blockhash.clone(), BlockInformation { slot, block_height }); - *self.latest_block_hash.write().await = blockhash; + .add_block( + blockhash.clone(), + BlockInformation { slot, block_height }, + commitment_config, + ) + .await; if let Some(postgres) = &postgres { let Some(rewards) = block.rewards else { @@ -182,12 +156,12 @@ impl BlockListener { continue; }; - let leader_id = &leader_reward.pubkey; + let _leader_id = &leader_reward.pubkey; postgres .send(PostgresMsg::PostgresBlock(PostgresBlock { slot: slot as i64, - leader_id, //FIX: + leader_id: 0, //FIX: parent_slot: parent_slot as i64, })) .expect("Error sending block to postgres service"); diff --git a/src/workers/cleaner.rs b/src/workers/cleaner.rs index 3a744d4e..1009140a 100644 --- a/src/workers/cleaner.rs +++ b/src/workers/cleaner.rs @@ -7,16 +7,16 @@ use super::{BlockListener, TxSender}; /// Background worker which cleans up memory #[derive(Clone)] -pub struct Cleaner { +pub struct Cleaner { tx_sender: TxSender, - block_listeners: [BlockListener; N], + block_listenser: BlockListener, } -impl Cleaner { - pub fn new(tx_sender: TxSender, block_listeners: [BlockListener; N]) -> Self { +impl Cleaner { + pub fn new(tx_sender: TxSender, block_listenser: BlockListener) -> Self { Self { tx_sender, - block_listeners, + block_listenser, } } @@ -38,21 +38,19 @@ impl Cleaner { /// Clean Signature Subscribers from Block Listeners pub fn clean_block_listeners(&self) { - for block_listenser in &self.block_listeners { - let mut to_remove = vec![]; + let mut to_remove = vec![]; - for subscriber in block_listenser.signature_subscribers.iter() { - if subscriber.value().is_closed() { - to_remove.push(subscriber.key().to_owned()); - } + for subscriber in self.block_listenser.signature_subscribers.iter() { + if subscriber.value().is_closed() { + to_remove.push(subscriber.key().to_owned()); } - - for to_remove in &to_remove { - block_listenser.signature_subscribers.remove(to_remove); - } - - info!("Cleaned {} Signature Subscribers", to_remove.len()); } + + for to_remove in &to_remove { + self.block_listenser.signature_subscribers.remove(to_remove); + } + + info!("Cleaned {} Signature Subscribers", to_remove.len()); } pub fn start(self, ttl_duration: Duration) -> JoinHandle> { diff --git a/tests/workers.rs b/tests/workers.rs index b535443b..5d056724 100644 --- a/tests/workers.rs +++ b/tests/workers.rs @@ -3,6 +3,7 @@ use std::{sync::Arc, time::Duration}; use bench::helpers::BenchHelper; use futures::future::try_join_all; use lite_rpc::{ + block_store::BlockStore, encoding::BinaryEncoding, tpu_manager::TpuManager, workers::{BlockListener, TxSender}, @@ -35,20 +36,21 @@ async fn send_and_confirm_txs() { let pub_sub_client = Arc::new(PubsubClient::new(DEFAULT_WS_ADDR).await.unwrap()); let tx_sender = TxSender::new(tpu_client); + let block_store = BlockStore::new(&rpc_client).await.unwrap(); let block_listener = BlockListener::new( pub_sub_client.clone(), rpc_client.clone(), tx_sender.clone(), - CommitmentConfig::confirmed(), - ) - .await - .unwrap(); + block_store, + ); let (tx_send, tx_recv) = mpsc::unbounded_channel(); let services = try_join_all(vec![ - block_listener.clone().listen(None), + block_listener + .clone() + .listen(CommitmentConfig::confirmed(), None), tx_sender.clone().execute( tx_recv, DEFAULT_TX_BATCH_SIZE,