untested: block_store and common block_listener

This commit is contained in:
aniketfuryrocks 2023-02-02 19:00:15 +05:30
parent 8aab926e39
commit 7f97b62845
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
8 changed files with 164 additions and 143 deletions

View File

@ -1,21 +1,42 @@
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use dashmap::DashMap; use dashmap::DashMap;
use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::commitment_config::CommitmentConfig;
use tokio::sync::RwLock;
use crate::workers::BlockInformation; use crate::workers::BlockInformation;
#[derive(Clone)]
pub struct BlockStore { pub struct BlockStore {
blocks: Arc<DashMap<String, BlockInformation>>, blocks: Arc<DashMap<String, BlockInformation>>,
latest_block_hash: Arc<RwLock<String>>, latest_confirmed_blockhash: Arc<RwLock<String>>,
latest_finalized_blockhash: Arc<RwLock<String>>,
} }
impl BlockStore { impl BlockStore {
pub async fn new( pub async fn new(rpc_client: &RpcClient) -> anyhow::Result<Self> {
let (confirmed_blockhash, confirmed_block) =
Self::fetch_latest(rpc_client, CommitmentConfig::confirmed()).await?;
let (finalized_blockhash, finalized_block) =
Self::fetch_latest(rpc_client, CommitmentConfig::finalized()).await?;
Ok(Self {
latest_confirmed_blockhash: Arc::new(RwLock::new(confirmed_blockhash.clone())),
latest_finalized_blockhash: Arc::new(RwLock::new(finalized_blockhash.clone())),
blocks: Arc::new({
let map = DashMap::new();
map.insert(confirmed_blockhash, confirmed_block);
map.insert(finalized_blockhash, finalized_block);
map
}),
})
}
pub async fn fetch_latest(
rpc_client: &RpcClient, rpc_client: &RpcClient,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> anyhow::Result<Self> { ) -> anyhow::Result<(String, BlockInformation)> {
let (latest_block_hash, block_height) = rpc_client let (latest_block_hash, block_height) = rpc_client
.get_latest_blockhash_with_commitment(commitment_config) .get_latest_blockhash_with_commitment(commitment_config)
.await?; .await?;
@ -25,13 +46,55 @@ impl BlockStore {
.get_slot_with_commitment(commitment_config) .get_slot_with_commitment(commitment_config)
.await?; .await?;
Ok(Self { Ok((latest_block_hash, BlockInformation { slot, block_height }))
latest_block_hash: Arc::new(RwLock::new(latest_block_hash.clone())), }
blocks: Arc::new({
let map = DashMap::new(); pub async fn get_block_info(&self, blockhash: &str) -> Option<BlockInformation> {
map.insert(latest_block_hash, BlockInformation { slot, block_height }); let Some(info) = self.blocks.get(blockhash) else {
map return None;
}), };
})
Some(info.value().to_owned())
}
pub fn get_latest_blockhash(&self, commitment_config: CommitmentConfig) -> Arc<RwLock<String>> {
if commitment_config.is_finalized() {
self.latest_confirmed_blockhash.clone()
} else {
self.latest_finalized_blockhash.clone()
}
}
pub async fn get_latest_block_info(
&self,
commitment_config: CommitmentConfig,
) -> (String, BlockInformation) {
let blockhash = self
.get_latest_blockhash(commitment_config)
.read()
.await
.to_owned();
let block_info = self
.blocks
.get(&blockhash)
.expect("Race Condition: Latest block not in block store")
.value()
.to_owned();
(blockhash, block_info)
}
pub async fn add_block(
&self,
blockhash: String,
block_info: BlockInformation,
commitment_config: CommitmentConfig,
) {
// Write to block store first in order to prevent
// any race condition i.e prevent some one to
// ask the map what it doesn't have rn
self.blocks.insert(blockhash.clone(), block_info);
*self.get_latest_blockhash(commitment_config).write().await = blockhash;
} }
} }

View File

@ -1,4 +1,5 @@
use crate::{ use crate::{
block_store::BlockStore,
configs::{IsBlockHashValidConfig, SendTransactionConfig}, configs::{IsBlockHashValidConfig, SendTransactionConfig},
encoding::BinaryEncoding, encoding::BinaryEncoding,
rpc::LiteRpcServer, rpc::LiteRpcServer,
@ -23,9 +24,7 @@ use solana_rpc_client_api::{
response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo}, response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo},
}; };
use solana_sdk::{ use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel}, commitment_config::CommitmentConfig, hash::Hash, pubkey::Pubkey,
hash::Hash,
pubkey::Pubkey,
transaction::VersionedTransaction, transaction::VersionedTransaction,
}; };
use solana_transaction_status::TransactionStatus; use solana_transaction_status::TransactionStatus;
@ -42,14 +41,13 @@ pub struct LiteBridge {
// None if LiteBridge is not executed // None if LiteBridge is not executed
pub tx_send: Option<UnboundedSender<(String, WireTransaction, u64)>>, pub tx_send: Option<UnboundedSender<(String, WireTransaction, u64)>>,
pub tx_sender: TxSender, pub tx_sender: TxSender,
pub finalized_block_listener: BlockListener, pub block_listner: BlockListener,
pub confirmed_block_listener: BlockListener, pub block_store: BlockStore,
} }
impl LiteBridge { impl LiteBridge {
pub async fn new(rpc_url: String, ws_addr: String, fanout_slots: u64) -> anyhow::Result<Self> { pub async fn new(rpc_url: String, ws_addr: String, fanout_slots: u64) -> anyhow::Result<Self> {
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone())); let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
let pub_sub_client = Arc::new(PubsubClient::new(&ws_addr).await?); let pub_sub_client = Arc::new(PubsubClient::new(&ws_addr).await?);
let tpu_manager = let tpu_manager =
@ -57,40 +55,25 @@ impl LiteBridge {
let tx_sender = TxSender::new(tpu_manager.clone()); let tx_sender = TxSender::new(tpu_manager.clone());
let finalized_block_listener = BlockListener::new( let block_store = BlockStore::new(&rpc_client).await?;
let block_listner = BlockListener::new(
pub_sub_client.clone(), pub_sub_client.clone(),
rpc_client.clone(), rpc_client.clone(),
tx_sender.clone(), tx_sender.clone(),
CommitmentConfig::finalized(), block_store.clone(),
) );
.await?;
let confirmed_block_listener = BlockListener::new(
pub_sub_client,
rpc_client.clone(),
tx_sender.clone(),
CommitmentConfig::confirmed(),
)
.await?;
Ok(Self { Ok(Self {
rpc_client, rpc_client,
tpu_manager, tpu_manager,
tx_send: None, tx_send: None,
tx_sender, tx_sender,
finalized_block_listener, block_listner,
confirmed_block_listener, block_store,
}) })
} }
pub fn get_block_listner(&self, commitment_config: CommitmentConfig) -> BlockListener {
if let CommitmentLevel::Finalized = commitment_config.commitment {
self.finalized_block_listener.clone()
} else {
self.confirmed_block_listener.clone()
}
}
/// List for `JsonRpc` requests /// List for `JsonRpc` requests
pub async fn start_services<T: ToSocketAddrs + std::fmt::Debug + 'static + Send + Clone>( pub async fn start_services<T: ToSocketAddrs + std::fmt::Debug + 'static + Send + Clone>(
mut self, mut self,
@ -127,19 +110,17 @@ impl LiteBridge {
let metrics_capture = metrics_capture.capture(); let metrics_capture = metrics_capture.capture();
let finalized_block_listener = self let finalized_block_listener = self
.finalized_block_listener .block_listner
.clone() .clone()
.listen(postgres_send.clone()); .listen(CommitmentConfig::finalized(), postgres_send.clone());
let confirmed_block_listener = self.confirmed_block_listener.clone().listen(None); let confirmed_block_listener = self
let cleaner = Cleaner::new( .block_listner
self.tx_sender.clone(), .clone()
[ .listen(CommitmentConfig::confirmed(), None);
self.finalized_block_listener.clone(),
self.confirmed_block_listener.clone(), let cleaner =
], Cleaner::new(self.tx_sender.clone(), self.block_listner.clone()).start(clean_interval);
)
.start(clean_interval);
let rpc = self.into_rpc(); let rpc = self.into_rpc();
@ -219,11 +200,11 @@ impl LiteRpcServer for LiteBridge {
let sig = tx.get_signature(); let sig = tx.get_signature();
let Some(BlockInformation { slot, .. }) = self let Some(BlockInformation { slot, .. }) = self
.confirmed_block_listener .block_store
.get_block_info(&tx.get_recent_blockhash().to_string()) .get_block_info(&tx.get_recent_blockhash().to_string())
.await else { .await else {
log::warn!("block"); log::warn!("block");
return Err(jsonrpsee::core::Error::Custom("Blockhash not found in confirmed block store".to_string())); return Err(jsonrpsee::core::Error::Custom("Blockhash not found in block store".to_string()));
}; };
self.tx_send self.tx_send
@ -245,9 +226,10 @@ impl LiteRpcServer for LiteBridge {
CommitmentConfig::default() CommitmentConfig::default()
}; };
let block_listner = self.get_block_listner(commitment_config); let (blockhash, BlockInformation { slot, block_height }) = self
let (blockhash, BlockInformation { slot, block_height }) = .block_store
block_listner.get_latest_block_info().await; .get_latest_block_info(commitment_config)
.await;
Ok(RpcResponse { Ok(RpcResponse {
context: RpcResponseContext { context: RpcResponseContext {
@ -276,8 +258,6 @@ impl LiteRpcServer for LiteBridge {
} }
}; };
let block_listner = self.get_block_listner(commitment);
let is_valid = match self let is_valid = match self
.rpc_client .rpc_client
.is_blockhash_valid(&blockhash, commitment) .is_blockhash_valid(&blockhash, commitment)
@ -289,7 +269,12 @@ impl LiteRpcServer for LiteBridge {
} }
}; };
let slot = block_listner.get_latest_block_info().await.1.slot; let slot = self
.block_store
.get_latest_block_info(commitment)
.await
.1
.slot;
Ok(RpcResponse { Ok(RpcResponse {
context: RpcResponseContext { context: RpcResponseContext {
@ -318,8 +303,8 @@ impl LiteRpcServer for LiteBridge {
Ok(RpcResponse { Ok(RpcResponse {
context: RpcResponseContext { context: RpcResponseContext {
slot: self slot: self
.finalized_block_listener .block_store
.get_latest_block_info() .get_latest_block_info(CommitmentConfig::finalized())
.await .await
.1 .1
.slot, .slot,
@ -372,11 +357,10 @@ impl LiteRpcServer for LiteBridge {
&self, &self,
mut sink: SubscriptionSink, mut sink: SubscriptionSink,
signature: String, signature: String,
commitment_config: CommitmentConfig, _commitment_config: CommitmentConfig,
) -> SubscriptionResult { ) -> SubscriptionResult {
sink.accept()?; sink.accept()?;
self.get_block_listner(commitment_config) self.block_listner.signature_subscribe(signature, sink);
.signature_subscribe(signature, sink);
Ok(()) Ok(())
} }
} }

View File

@ -29,5 +29,5 @@ pub struct Args {
pub clean_interval_ms: u64, pub clean_interval_ms: u64,
/// addr to postgres /// addr to postgres
#[arg(short = 'p', long)] #[arg(short = 'p', long)]
pub enable_postgres: bool pub enable_postgres: bool,
} }

View File

@ -1,6 +1,7 @@
use const_env::from_env; use const_env::from_env;
use solana_transaction_status::TransactionConfirmationStatus; use solana_transaction_status::TransactionConfirmationStatus;
pub mod block_store;
pub mod bridge; pub mod bridge;
pub mod cli; pub mod cli;
pub mod configs; pub mod configs;
@ -9,7 +10,6 @@ pub mod errors;
pub mod rpc; pub mod rpc;
pub mod tpu_manager; pub mod tpu_manager;
pub mod workers; pub mod workers;
pub mod block_store;
#[from_env] #[from_env]
pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899"; pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899";

View File

@ -16,7 +16,7 @@ pub struct TpuManager {
error_count: Arc<AtomicU32>, error_count: Arc<AtomicU32>,
rpc_client: Arc<RpcClient>, rpc_client: Arc<RpcClient>,
tpu_client: Arc<RwLock<QuicTpuClient>>, tpu_client: Arc<RwLock<QuicTpuClient>>,
ws_addr: String, pub ws_addr: String,
fanout_slots: u64, fanout_slots: u64,
} }

View File

@ -19,11 +19,14 @@ use solana_transaction_status::{
TransactionStatus, UiConfirmedBlock, UiTransactionStatusMeta, TransactionStatus, UiConfirmedBlock, UiTransactionStatusMeta,
}; };
use tokio::{ use tokio::{
sync::{mpsc::Sender, RwLock}, sync::{mpsc::Sender},
task::JoinHandle, task::JoinHandle,
}; };
use crate::workers::{PostgresBlock, PostgresMsg, PostgresUpdateTx}; use crate::{
block_store::{BlockStore},
workers::{PostgresBlock, PostgresMsg, PostgresUpdateTx},
};
use super::{PostgresMpscSend, TxProps, TxSender}; use super::{PostgresMpscSend, TxProps, TxSender};
@ -32,10 +35,8 @@ use super::{PostgresMpscSend, TxProps, TxSender};
#[derive(Clone)] #[derive(Clone)]
pub struct BlockListener { pub struct BlockListener {
pub_sub_client: Arc<PubsubClient>, pub_sub_client: Arc<PubsubClient>,
commitment_config: CommitmentConfig,
tx_sender: TxSender, tx_sender: TxSender,
block_store: Arc<DashMap<String, BlockInformation>>, block_store: BlockStore,
latest_block_hash: Arc<RwLock<String>>,
pub signature_subscribers: Arc<DashMap<String, SubscriptionSink>>, pub signature_subscribers: Arc<DashMap<String, SubscriptionSink>>,
} }
@ -51,25 +52,18 @@ pub struct BlockListnerNotificatons {
} }
impl BlockListener { impl BlockListener {
pub async fn new( pub fn new(
pub_sub_client: Arc<PubsubClient>, pub_sub_client: Arc<PubsubClient>,
rpc_client: Arc<RpcClient>, _rpc_client: Arc<RpcClient>,
tx_sender: TxSender, tx_sender: TxSender,
commitment_config: CommitmentConfig, block_store: BlockStore,
) -> anyhow::Result<Self> { ) -> Self {
Self {
Ok(Self {
pub_sub_client, pub_sub_client,
tx_sender, tx_sender,
latest_block_hash: Arc::new(RwLock::new(latest_block_hash.clone())), block_store,
block_store: Arc::new({
let map = DashMap::new();
map.insert(latest_block_hash, BlockInformation { slot, block_height });
map
}),
commitment_config,
signature_subscribers: Default::default(), signature_subscribers: Default::default(),
}) }
} }
pub async fn num_of_sigs_commited(&self, sigs: &[String]) -> usize { pub async fn num_of_sigs_commited(&self, sigs: &[String]) -> usize {
@ -82,31 +76,6 @@ impl BlockListener {
num_of_sigs_commited num_of_sigs_commited
} }
pub async fn get_latest_block_info(&self) -> (String, BlockInformation) {
let blockhash = &*self.latest_block_hash.read().await;
(
blockhash.to_owned(),
self.block_store
.get(blockhash)
.expect("Race Condition: Latest block not in block store")
.value()
.to_owned(),
)
}
pub async fn get_block_info(&self, blockhash: &str) -> Option<BlockInformation> {
let Some(info) = self.block_store.get(blockhash) else {
return None;
};
Some(info.value().to_owned())
}
pub async fn get_latest_blockhash(&self) -> String {
self.latest_block_hash.read().await.to_owned()
}
pub fn signature_subscribe(&self, signature: String, sink: SubscriptionSink) { pub fn signature_subscribe(&self, signature: String, sink: SubscriptionSink) {
let _ = self.signature_subscribers.insert(signature, sink); let _ = self.signature_subscribers.insert(signature, sink);
} }
@ -115,9 +84,13 @@ impl BlockListener {
self.signature_subscribers.remove(&signature); self.signature_subscribers.remove(&signature);
} }
pub fn listen(self, postgres: Option<PostgresMpscSend>) -> JoinHandle<anyhow::Result<()>> { pub fn listen(
self,
commitment_config: CommitmentConfig,
postgres: Option<PostgresMpscSend>,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move { tokio::spawn(async move {
let commitment = self.commitment_config.commitment; let commitment = commitment_config.commitment;
let comfirmation_status = match commitment { let comfirmation_status = match commitment {
CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized, CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized,
@ -131,7 +104,7 @@ impl BlockListener {
.block_subscribe( .block_subscribe(
RpcBlockSubscribeFilter::All, RpcBlockSubscribeFilter::All,
Some(RpcBlockSubscribeConfig { Some(RpcBlockSubscribeConfig {
commitment: Some(self.commitment_config), commitment: Some(commitment_config),
encoding: None, encoding: None,
transaction_details: Some( transaction_details: Some(
solana_transaction_status::TransactionDetails::Full, solana_transaction_status::TransactionDetails::Full,
@ -164,12 +137,13 @@ impl BlockListener {
let parent_slot = block.parent_slot; let parent_slot = block.parent_slot;
// Write to block store first in order to prevent
// any race condition i.e prevent some one to
// ask the map what it doesn't have rn
self.block_store self.block_store
.insert(blockhash.clone(), BlockInformation { slot, block_height }); .add_block(
*self.latest_block_hash.write().await = blockhash; blockhash.clone(),
BlockInformation { slot, block_height },
commitment_config,
)
.await;
if let Some(postgres) = &postgres { if let Some(postgres) = &postgres {
let Some(rewards) = block.rewards else { let Some(rewards) = block.rewards else {
@ -182,12 +156,12 @@ impl BlockListener {
continue; continue;
}; };
let leader_id = &leader_reward.pubkey; let _leader_id = &leader_reward.pubkey;
postgres postgres
.send(PostgresMsg::PostgresBlock(PostgresBlock { .send(PostgresMsg::PostgresBlock(PostgresBlock {
slot: slot as i64, slot: slot as i64,
leader_id, //FIX: leader_id: 0, //FIX:
parent_slot: parent_slot as i64, parent_slot: parent_slot as i64,
})) }))
.expect("Error sending block to postgres service"); .expect("Error sending block to postgres service");

View File

@ -7,16 +7,16 @@ use super::{BlockListener, TxSender};
/// Background worker which cleans up memory /// Background worker which cleans up memory
#[derive(Clone)] #[derive(Clone)]
pub struct Cleaner<const N: usize> { pub struct Cleaner {
tx_sender: TxSender, tx_sender: TxSender,
block_listeners: [BlockListener; N], block_listenser: BlockListener,
} }
impl<const N: usize> Cleaner<N> { impl Cleaner {
pub fn new(tx_sender: TxSender, block_listeners: [BlockListener; N]) -> Self { pub fn new(tx_sender: TxSender, block_listenser: BlockListener) -> Self {
Self { Self {
tx_sender, tx_sender,
block_listeners, block_listenser,
} }
} }
@ -38,22 +38,20 @@ impl<const N: usize> Cleaner<N> {
/// Clean Signature Subscribers from Block Listeners /// Clean Signature Subscribers from Block Listeners
pub fn clean_block_listeners(&self) { pub fn clean_block_listeners(&self) {
for block_listenser in &self.block_listeners {
let mut to_remove = vec![]; let mut to_remove = vec![];
for subscriber in block_listenser.signature_subscribers.iter() { for subscriber in self.block_listenser.signature_subscribers.iter() {
if subscriber.value().is_closed() { if subscriber.value().is_closed() {
to_remove.push(subscriber.key().to_owned()); to_remove.push(subscriber.key().to_owned());
} }
} }
for to_remove in &to_remove { for to_remove in &to_remove {
block_listenser.signature_subscribers.remove(to_remove); self.block_listenser.signature_subscribers.remove(to_remove);
} }
info!("Cleaned {} Signature Subscribers", to_remove.len()); info!("Cleaned {} Signature Subscribers", to_remove.len());
} }
}
pub fn start(self, ttl_duration: Duration) -> JoinHandle<anyhow::Result<()>> { pub fn start(self, ttl_duration: Duration) -> JoinHandle<anyhow::Result<()>> {
let mut ttl = tokio::time::interval(ttl_duration); let mut ttl = tokio::time::interval(ttl_duration);

View File

@ -3,6 +3,7 @@ use std::{sync::Arc, time::Duration};
use bench::helpers::BenchHelper; use bench::helpers::BenchHelper;
use futures::future::try_join_all; use futures::future::try_join_all;
use lite_rpc::{ use lite_rpc::{
block_store::BlockStore,
encoding::BinaryEncoding, encoding::BinaryEncoding,
tpu_manager::TpuManager, tpu_manager::TpuManager,
workers::{BlockListener, TxSender}, workers::{BlockListener, TxSender},
@ -35,20 +36,21 @@ async fn send_and_confirm_txs() {
let pub_sub_client = Arc::new(PubsubClient::new(DEFAULT_WS_ADDR).await.unwrap()); let pub_sub_client = Arc::new(PubsubClient::new(DEFAULT_WS_ADDR).await.unwrap());
let tx_sender = TxSender::new(tpu_client); let tx_sender = TxSender::new(tpu_client);
let block_store = BlockStore::new(&rpc_client).await.unwrap();
let block_listener = BlockListener::new( let block_listener = BlockListener::new(
pub_sub_client.clone(), pub_sub_client.clone(),
rpc_client.clone(), rpc_client.clone(),
tx_sender.clone(), tx_sender.clone(),
CommitmentConfig::confirmed(), block_store,
) );
.await
.unwrap();
let (tx_send, tx_recv) = mpsc::unbounded_channel(); let (tx_send, tx_recv) = mpsc::unbounded_channel();
let services = try_join_all(vec![ let services = try_join_all(vec![
block_listener.clone().listen(None), block_listener
.clone()
.listen(CommitmentConfig::confirmed(), None),
tx_sender.clone().execute( tx_sender.clone().execute(
tx_recv, tx_recv,
DEFAULT_TX_BATCH_SIZE, DEFAULT_TX_BATCH_SIZE,