Merge pull request #47 from blockworks-foundation/block_store

Unified Block store, more metrics and websocket auto fix
This commit is contained in:
Aniket Prajapati 2023-02-06 18:39:44 +05:30 committed by GitHub
commit 1e60c090ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1420 additions and 876 deletions

1451
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -13,20 +13,24 @@ members = [
bench = { path = "./bench" } bench = { path = "./bench" }
[dependencies] [dependencies]
solana-client = "1.14.13" solana-sdk = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-sdk = "1.14.13" solana-rpc-client = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-transaction-status = "1.14.13" solana-rpc-client-api= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-version = "1.14.13" solana-tpu-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-quic-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-pubsub-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-transaction-status = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-version= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
serde = { version = "1.0.152", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.91" serde_json = "1.0.92"
tokio = { version = "1.25.0", features = ["full"]} tokio = { version = "1.25.0", features = ["full"]}
bincode = "1.3.3" bincode = "1.3.3"
bs58 = "0.4.0" bs58 = "0.4.0"
base64 = "0.21.0" base64 = "0.21.0"
thiserror = "1.0.38" thiserror = "1.0.38"
futures = "0.3.26" futures = "0.3.26"
bytes = "1.3.0" bytes = "1.4.0"
anyhow = "1.0.68" anyhow = "1.0.69"
log = "0.4.17" log = "0.4.17"
clap = { version = "4.1.4", features = ["derive"] } clap = { version = "4.1.4", features = ["derive"] }
dashmap = "5.4.0" dashmap = "5.4.0"
@ -36,4 +40,5 @@ tracing-subscriber = "0.3.16"
tokio-postgres = "0.7.7" tokio-postgres = "0.7.7"
native-tls = "0.2.11" native-tls = "0.2.11"
postgres-native-tls = "0.5.0" postgres-native-tls = "0.5.0"
serde_prometheus = "0.1.6" prometheus = "0.13.3"
lazy_static = "1.4.0"

View File

@ -4,12 +4,12 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
solana-client = "1.14.13" solana-sdk = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-sdk = "1.14.13" solana-rpc-client = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
log = "0.4.17" log = "0.4.17"
anyhow = "1.0.68" anyhow = "1.0.69"
serde = "1.0.152" serde = "1.0.152"
serde_json = "1.0.91" serde_json = "1.0.92"
csv = "1.1.6" csv = "1.1.6"
clap = { version = "4.1.4", features = ["derive"] } clap = { version = "4.1.4", features = ["derive"] }
tokio = { version = "1.25.0", features = ["full", "fs"]} tokio = { version = "1.25.0", features = ["full", "fs"]}

View File

@ -1,7 +1,7 @@
use std::{ops::Deref, sync::Arc}; use std::{ops::Deref, sync::Arc};
use anyhow::Context; use anyhow::Context;
use solana_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{ use solana_sdk::{
commitment_config::CommitmentConfig, commitment_config::CommitmentConfig,
hash::Hash, hash::Hash,

View File

@ -11,7 +11,7 @@ use bench::{
}; };
use clap::Parser; use clap::Parser;
use log::info; use log::info;
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction}; use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
#[tokio::main] #[tokio::main]

101
src/block_store.rs Normal file
View File

@ -0,0 +1,101 @@
use std::sync::Arc;
use dashmap::DashMap;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::sync::RwLock;
use crate::workers::BlockInformation;
#[derive(Clone)]
pub struct BlockStore {
blocks: Arc<DashMap<String, BlockInformation>>,
latest_confirmed_blockhash: Arc<RwLock<String>>,
latest_finalized_blockhash: Arc<RwLock<String>>,
}
impl BlockStore {
pub async fn new(rpc_client: &RpcClient) -> anyhow::Result<Self> {
let (confirmed_blockhash, confirmed_block) =
Self::fetch_latest(rpc_client, CommitmentConfig::confirmed()).await?;
let (finalized_blockhash, finalized_block) =
Self::fetch_latest(rpc_client, CommitmentConfig::finalized()).await?;
Ok(Self {
latest_confirmed_blockhash: Arc::new(RwLock::new(confirmed_blockhash.clone())),
latest_finalized_blockhash: Arc::new(RwLock::new(finalized_blockhash.clone())),
blocks: Arc::new({
let map = DashMap::new();
map.insert(confirmed_blockhash, confirmed_block);
map.insert(finalized_blockhash, finalized_block);
map
}),
})
}
pub async fn fetch_latest(
rpc_client: &RpcClient,
commitment_config: CommitmentConfig,
) -> anyhow::Result<(String, BlockInformation)> {
let (latest_block_hash, block_height) = rpc_client
.get_latest_blockhash_with_commitment(commitment_config)
.await?;
let latest_block_hash = latest_block_hash.to_string();
let slot = rpc_client
.get_slot_with_commitment(commitment_config)
.await?;
Ok((latest_block_hash, BlockInformation { slot, block_height }))
}
pub async fn get_block_info(&self, blockhash: &str) -> Option<BlockInformation> {
let Some(info) = self.blocks.get(blockhash) else {
return None;
};
Some(info.value().to_owned())
}
pub fn get_latest_blockhash(&self, commitment_config: CommitmentConfig) -> Arc<RwLock<String>> {
if commitment_config.is_finalized() {
self.latest_finalized_blockhash.clone()
} else {
self.latest_confirmed_blockhash.clone()
}
}
pub async fn get_latest_block_info(
&self,
commitment_config: CommitmentConfig,
) -> (String, BlockInformation) {
let blockhash = self
.get_latest_blockhash(commitment_config)
.read()
.await
.to_owned();
let block_info = self
.blocks
.get(&blockhash)
.expect("Race Condition: Latest block not in block store")
.value()
.to_owned();
(blockhash, block_info)
}
pub async fn add_block(
&self,
blockhash: String,
block_info: BlockInformation,
commitment_config: CommitmentConfig,
) {
// Write to block store first in order to prevent
// any race condition i.e prevent some one to
// ask the map what it doesn't have rn
self.blocks.insert(blockhash.clone(), block_info);
*self.get_latest_blockhash(commitment_config).write().await = blockhash;
}
}

View File

@ -1,4 +1,5 @@
use crate::{ use crate::{
block_store::BlockStore,
configs::{IsBlockHashValidConfig, SendTransactionConfig}, configs::{IsBlockHashValidConfig, SendTransactionConfig},
encoding::BinaryEncoding, encoding::BinaryEncoding,
rpc::LiteRpcServer, rpc::LiteRpcServer,
@ -16,16 +17,15 @@ use anyhow::bail;
use log::info; use log::info;
use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink}; use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink};
use solana_client::{
nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient}, use prometheus::{opts, register_counter, Counter};
rpc_client::SerializableTransaction, use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
rpc_config::{RpcContextConfig, RpcRequestAirdropConfig}, use solana_rpc_client_api::{
rpc_response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo}, config::{RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig},
response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo},
}; };
use solana_sdk::{ use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel}, commitment_config::CommitmentConfig, hash::Hash, pubkey::Pubkey,
hash::Hash,
pubkey::Pubkey,
transaction::VersionedTransaction, transaction::VersionedTransaction,
}; };
use solana_transaction_status::TransactionStatus; use solana_transaction_status::TransactionStatus;
@ -35,6 +35,24 @@ use tokio::{
task::JoinHandle, task::JoinHandle,
}; };
lazy_static::lazy_static! {
static ref RPC_SEND_TX: Counter =
register_counter!(opts!("rpc_send_tx", "RPC call send transaction")).unwrap();
static ref RPC_GET_LATEST_BLOCKHASH: Counter =
register_counter!(opts!("rpc_get_latest_blockhash", "RPC call to get latest block hash")).unwrap();
static ref RPC_IS_BLOCKHASH_VALID: Counter =
register_counter!(opts!("rpc_is_blockhash_valid", "RPC call to check if blockhash is vali calld")).unwrap();
static ref RPC_GET_SIGNATURE_STATUSES: Counter =
register_counter!(opts!("rpc_get_signature_statuses", "RPC call to get signature statuses")).unwrap();
static ref RPC_GET_VERSION: Counter =
register_counter!(opts!("rpc_get_version", "RPC call to version")).unwrap();
static ref RPC_REQUEST_AIRDROP: Counter =
register_counter!(opts!("rpc_airdrop", "RPC call to request airdrop")).unwrap();
static ref RPC_SIGNATURE_SUBSCRIBE: Counter =
register_counter!(opts!("rpc_signature_subscribe", "RPC call to subscribe to signature")).unwrap();
}
/// A bridge between clients and tpu /// A bridge between clients and tpu
pub struct LiteBridge { pub struct LiteBridge {
pub rpc_client: Arc<RpcClient>, pub rpc_client: Arc<RpcClient>,
@ -42,56 +60,35 @@ pub struct LiteBridge {
// None if LiteBridge is not executed // None if LiteBridge is not executed
pub tx_send: Option<UnboundedSender<(String, WireTransaction, u64)>>, pub tx_send: Option<UnboundedSender<(String, WireTransaction, u64)>>,
pub tx_sender: TxSender, pub tx_sender: TxSender,
pub finalized_block_listener: BlockListener, pub block_listner: BlockListener,
pub confirmed_block_listener: BlockListener, pub block_store: BlockStore,
} }
impl LiteBridge { impl LiteBridge {
pub async fn new(rpc_url: String, ws_addr: String, fanout_slots: u64) -> anyhow::Result<Self> { pub async fn new(rpc_url: String, ws_addr: String, fanout_slots: u64) -> anyhow::Result<Self> {
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone())); let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
let pub_sub_client = Arc::new(PubsubClient::new(&ws_addr).await?);
let tpu_manager = let tpu_manager =
Arc::new(TpuManager::new(rpc_client.clone(), ws_addr, fanout_slots).await?); Arc::new(TpuManager::new(rpc_client.clone(), ws_addr, fanout_slots).await?);
let tx_sender = TxSender::new(tpu_manager.clone()); let tx_sender = TxSender::new(tpu_manager.clone());
let finalized_block_listener = BlockListener::new( let block_store = BlockStore::new(&rpc_client).await?;
pub_sub_client.clone(),
rpc_client.clone(),
tx_sender.clone(),
CommitmentConfig::finalized(),
)
.await?;
let confirmed_block_listener = BlockListener::new( let block_listner = BlockListener::new(tx_sender.clone(), block_store.clone());
pub_sub_client,
rpc_client.clone(),
tx_sender.clone(),
CommitmentConfig::confirmed(),
)
.await?;
Ok(Self { Ok(Self {
rpc_client, rpc_client,
tpu_manager, tpu_manager,
tx_send: None, tx_send: None,
tx_sender, tx_sender,
finalized_block_listener, block_listner,
confirmed_block_listener, block_store,
}) })
} }
pub fn get_block_listner(&self, commitment_config: CommitmentConfig) -> BlockListener {
if let CommitmentLevel::Finalized = commitment_config.commitment {
self.finalized_block_listener.clone()
} else {
self.confirmed_block_listener.clone()
}
}
/// List for `JsonRpc` requests /// List for `JsonRpc` requests
#[allow(clippy::too_many_arguments)]
pub async fn start_services<T: ToSocketAddrs + std::fmt::Debug + 'static + Send + Clone>( pub async fn start_services<T: ToSocketAddrs + std::fmt::Debug + 'static + Send + Clone>(
mut self, mut self,
http_addr: T, http_addr: T,
@ -100,6 +97,7 @@ impl LiteBridge {
tx_send_interval: Duration, tx_send_interval: Duration,
clean_interval: Duration, clean_interval: Duration,
enable_postgres: bool, enable_postgres: bool,
prometheus_addr: T,
) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> { ) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
let (postgres, postgres_send) = if enable_postgres { let (postgres, postgres_send) = if enable_postgres {
let (postgres_send, postgres_recv) = mpsc::unbounded_channel(); let (postgres_send, postgres_recv) = mpsc::unbounded_channel();
@ -122,24 +120,21 @@ impl LiteBridge {
postgres_send.clone(), postgres_send.clone(),
); );
let metrics_capture = MetricsCapture::new(self.tx_sender.clone()); let metrics_capture = MetricsCapture::new(self.tx_sender.clone()).capture();
let prometheus_sync = PrometheusSync::new(metrics_capture.clone()).sync(); let prometheus_sync = PrometheusSync.sync(prometheus_addr);
let metrics_capture = metrics_capture.capture();
let finalized_block_listener = self let finalized_block_listener = self
.finalized_block_listener .block_listner
.clone() .clone()
.listen(postgres_send.clone()); .listen(CommitmentConfig::finalized(), postgres_send.clone());
let confirmed_block_listener = self.confirmed_block_listener.clone().listen(None); let confirmed_block_listener = self
let cleaner = Cleaner::new( .block_listner
self.tx_sender.clone(), .clone()
[ .listen(CommitmentConfig::confirmed(), None);
self.finalized_block_listener.clone(),
self.confirmed_block_listener.clone(), let cleaner =
], Cleaner::new(self.tx_sender.clone(), self.block_listner.clone()).start(clean_interval);
)
.start(clean_interval);
let rpc = self.into_rpc(); let rpc = self.into_rpc();
@ -198,6 +193,8 @@ impl LiteRpcServer for LiteBridge {
tx: String, tx: String,
send_transaction_config: Option<SendTransactionConfig>, send_transaction_config: Option<SendTransactionConfig>,
) -> crate::rpc::Result<String> { ) -> crate::rpc::Result<String> {
RPC_SEND_TX.inc();
let SendTransactionConfig { let SendTransactionConfig {
encoding, encoding,
max_retries: _, max_retries: _,
@ -219,11 +216,11 @@ impl LiteRpcServer for LiteBridge {
let sig = tx.get_signature(); let sig = tx.get_signature();
let Some(BlockInformation { slot, .. }) = self let Some(BlockInformation { slot, .. }) = self
.confirmed_block_listener .block_store
.get_block_info(&tx.get_recent_blockhash().to_string()) .get_block_info(&tx.get_recent_blockhash().to_string())
.await else { .await else {
log::warn!("block"); log::warn!("block");
return Err(jsonrpsee::core::Error::Custom("Blockhash not found in confirmed block store".to_string())); return Err(jsonrpsee::core::Error::Custom("Blockhash not found in block store".to_string()));
}; };
self.tx_send self.tx_send
@ -237,17 +234,18 @@ impl LiteRpcServer for LiteBridge {
async fn get_latest_blockhash( async fn get_latest_blockhash(
&self, &self,
config: Option<solana_client::rpc_config::RpcContextConfig>, config: Option<RpcContextConfig>,
) -> crate::rpc::Result<RpcResponse<solana_client::rpc_response::RpcBlockhash>> { ) -> crate::rpc::Result<RpcResponse<RpcBlockhash>> {
let commitment_config = if let Some(RpcContextConfig { commitment, .. }) = config { RPC_GET_LATEST_BLOCKHASH.inc();
commitment.unwrap_or_default()
} else {
CommitmentConfig::default()
};
let block_listner = self.get_block_listner(commitment_config); let commitment_config = config
let (blockhash, BlockInformation { slot, block_height }) = .map(|config| config.commitment.unwrap_or_default())
block_listner.get_latest_block_info().await; .unwrap_or_default();
let (blockhash, BlockInformation { slot, block_height }) = self
.block_store
.get_latest_block_info(commitment_config)
.await;
Ok(RpcResponse { Ok(RpcResponse {
context: RpcResponseContext { context: RpcResponseContext {
@ -266,6 +264,8 @@ impl LiteRpcServer for LiteBridge {
blockhash: String, blockhash: String,
config: Option<IsBlockHashValidConfig>, config: Option<IsBlockHashValidConfig>,
) -> crate::rpc::Result<RpcResponse<bool>> { ) -> crate::rpc::Result<RpcResponse<bool>> {
RPC_IS_BLOCKHASH_VALID.inc();
let commitment = config.unwrap_or_default().commitment.unwrap_or_default(); let commitment = config.unwrap_or_default().commitment.unwrap_or_default();
let commitment = CommitmentConfig { commitment }; let commitment = CommitmentConfig { commitment };
@ -276,8 +276,6 @@ impl LiteRpcServer for LiteBridge {
} }
}; };
let block_listner = self.get_block_listner(commitment);
let is_valid = match self let is_valid = match self
.rpc_client .rpc_client
.is_blockhash_valid(&blockhash, commitment) .is_blockhash_valid(&blockhash, commitment)
@ -289,7 +287,12 @@ impl LiteRpcServer for LiteBridge {
} }
}; };
let slot = block_listner.get_latest_block_info().await.1.slot; let slot = self
.block_store
.get_latest_block_info(commitment)
.await
.1
.slot;
Ok(RpcResponse { Ok(RpcResponse {
context: RpcResponseContext { context: RpcResponseContext {
@ -303,8 +306,10 @@ impl LiteRpcServer for LiteBridge {
async fn get_signature_statuses( async fn get_signature_statuses(
&self, &self,
sigs: Vec<String>, sigs: Vec<String>,
_config: Option<solana_client::rpc_config::RpcSignatureStatusConfig>, _config: Option<RpcSignatureStatusConfig>,
) -> crate::rpc::Result<RpcResponse<Vec<Option<TransactionStatus>>>> { ) -> crate::rpc::Result<RpcResponse<Vec<Option<TransactionStatus>>>> {
RPC_GET_SIGNATURE_STATUSES.inc();
let sig_statuses = sigs let sig_statuses = sigs
.iter() .iter()
.map(|sig| { .map(|sig| {
@ -318,8 +323,8 @@ impl LiteRpcServer for LiteBridge {
Ok(RpcResponse { Ok(RpcResponse {
context: RpcResponseContext { context: RpcResponseContext {
slot: self slot: self
.finalized_block_listener .block_store
.get_latest_block_info() .get_latest_block_info(CommitmentConfig::finalized())
.await .await
.1 .1
.slot, .slot,
@ -330,6 +335,8 @@ impl LiteRpcServer for LiteBridge {
} }
fn get_version(&self) -> crate::rpc::Result<RpcVersionInfo> { fn get_version(&self) -> crate::rpc::Result<RpcVersionInfo> {
RPC_GET_VERSION.inc();
let version = solana_version::Version::default(); let version = solana_version::Version::default();
Ok(RpcVersionInfo { Ok(RpcVersionInfo {
solana_core: version.to_string(), solana_core: version.to_string(),
@ -343,6 +350,8 @@ impl LiteRpcServer for LiteBridge {
lamports: u64, lamports: u64,
config: Option<RpcRequestAirdropConfig>, config: Option<RpcRequestAirdropConfig>,
) -> crate::rpc::Result<String> { ) -> crate::rpc::Result<String> {
RPC_REQUEST_AIRDROP.inc();
let pubkey = match Pubkey::from_str(&pubkey_str) { let pubkey = match Pubkey::from_str(&pubkey_str) {
Ok(pubkey) => pubkey, Ok(pubkey) => pubkey,
Err(err) => { Err(err) => {
@ -372,11 +381,11 @@ impl LiteRpcServer for LiteBridge {
&self, &self,
mut sink: SubscriptionSink, mut sink: SubscriptionSink,
signature: String, signature: String,
commitment_config: CommitmentConfig, _commitment_config: CommitmentConfig,
) -> SubscriptionResult { ) -> SubscriptionResult {
RPC_SIGNATURE_SUBSCRIBE.inc();
sink.accept()?; sink.accept()?;
self.get_block_listner(commitment_config) self.block_listner.signature_subscribe(signature, sink);
.signature_subscribe(signature, sink);
Ok(()) Ok(())
} }
} }

View File

@ -27,7 +27,10 @@ pub struct Args {
/// interval between clean /// interval between clean
#[arg(short = 'c', long, default_value_t = DEFAULT_CLEAN_INTERVAL_MS)] #[arg(short = 'c', long, default_value_t = DEFAULT_CLEAN_INTERVAL_MS)]
pub clean_interval_ms: u64, pub clean_interval_ms: u64,
/// addr to postgres /// enable logging to postgres
#[arg(short = 'p', long)] #[arg(short = 'p', long)]
pub enable_postgres: bool pub enable_postgres: bool,
/// enable metrics to prometheus at addr
#[arg(short = 'm', long, default_value_t = String::from("[::]:9091"))]
pub prometheus_addr: String,
} }

View File

@ -1,6 +1,7 @@
use const_env::from_env; use const_env::from_env;
use solana_transaction_status::TransactionConfirmationStatus; use solana_transaction_status::TransactionConfirmationStatus;
pub mod block_store;
pub mod bridge; pub mod bridge;
pub mod cli; pub mod cli;
pub mod configs; pub mod configs;

View File

@ -19,6 +19,7 @@ pub async fn main() -> anyhow::Result<()> {
clean_interval_ms, clean_interval_ms,
fanout_size, fanout_size,
enable_postgres, enable_postgres,
prometheus_addr,
} = Args::parse(); } = Args::parse();
let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms); let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms);
@ -34,6 +35,7 @@ pub async fn main() -> anyhow::Result<()> {
tx_batch_interval_ms, tx_batch_interval_ms,
clean_interval_ms, clean_interval_ms,
enable_postgres, enable_postgres,
prometheus_addr,
) )
.await?; .await?;

View File

@ -1,8 +1,8 @@
use jsonrpsee::proc_macros::rpc; use jsonrpsee::proc_macros::rpc;
use solana_client::rpc_config::{ use solana_rpc_client_api::config::{
RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig, RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig,
}; };
use solana_client::rpc_response::{Response as RpcResponse, RpcBlockhash, RpcVersionInfo}; use solana_rpc_client_api::response::{Response as RpcResponse, RpcBlockhash, RpcVersionInfo};
use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::TransactionStatus; use solana_transaction_status::TransactionStatus;

View File

@ -4,18 +4,19 @@ use std::sync::{
}; };
use log::info; use log::info;
use solana_client::{ use solana_quic_client::QuicPool;
nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient}, use solana_rpc_client::nonblocking::rpc_client::RpcClient;
tpu_client::TpuClientConfig, use solana_tpu_client::{nonblocking::tpu_client::TpuClient, tpu_client::TpuClientConfig};
};
use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::sync::{RwLock, RwLockReadGuard};
pub type QuicTpuClient = TpuClient<QuicPool>;
#[derive(Clone)] #[derive(Clone)]
pub struct TpuManager { pub struct TpuManager {
error_count: Arc<AtomicU32>, error_count: Arc<AtomicU32>,
rpc_client: Arc<RpcClient>, rpc_client: Arc<RpcClient>,
tpu_client: Arc<RwLock<TpuClient>>, tpu_client: Arc<RwLock<QuicTpuClient>>,
ws_addr: String, pub ws_addr: String,
fanout_slots: u64, fanout_slots: u64,
} }
@ -41,7 +42,7 @@ impl TpuManager {
rpc_client: Arc<RpcClient>, rpc_client: Arc<RpcClient>,
ws_addr: &str, ws_addr: &str,
fanout_slots: u64, fanout_slots: u64,
) -> anyhow::Result<TpuClient> { ) -> anyhow::Result<QuicTpuClient> {
Ok(TpuClient::new( Ok(TpuClient::new(
rpc_client.clone(), rpc_client.clone(),
ws_addr, ws_addr,
@ -84,7 +85,7 @@ impl TpuManager {
} }
} }
pub async fn get_tpu_client(&self) -> RwLockReadGuard<TpuClient> { pub async fn get_tpu_client(&self) -> RwLockReadGuard<QuicTpuClient> {
self.tpu_client.read().await self.tpu_client.read().await
} }
} }

View File

@ -4,41 +4,65 @@ use anyhow::{bail, Context};
use dashmap::DashMap; use dashmap::DashMap;
use futures::StreamExt; use futures::StreamExt;
use jsonrpsee::SubscriptionSink; use jsonrpsee::SubscriptionSink;
use log::info; use log::{info, warn};
use solana_client::{ use prometheus::{histogram_opts, opts, register_counter, register_histogram, Counter, Histogram};
nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient}, use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
rpc_client::SerializableTransaction, use solana_rpc_client::rpc_client::SerializableTransaction;
rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter}, use solana_rpc_client_api::{
rpc_response::{Response as RpcResponse, RpcResponseContext}, config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter},
response::{Response as RpcResponse, RpcResponseContext},
}; };
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_transaction_status::{ use solana_transaction_status::{
option_serializer::OptionSerializer, TransactionConfirmationStatus, TransactionStatus, option_serializer::OptionSerializer, RewardType, TransactionConfirmationStatus,
UiConfirmedBlock, UiTransactionStatusMeta, TransactionStatus, UiConfirmedBlock, UiTransactionStatusMeta,
};
use tokio::{
sync::{mpsc::Sender, RwLock},
task::JoinHandle,
}; };
use tokio::{sync::mpsc::Sender, task::JoinHandle};
use crate::workers::{PostgresBlock, PostgresMsg, PostgresUpdateTx}; use crate::{
block_store::BlockStore,
workers::{PostgresBlock, PostgresMsg, PostgresUpdateTx},
};
use super::{PostgresMpscSend, TxProps, TxSender}; use super::{PostgresMpscSend, TxProps, TxSender};
lazy_static::lazy_static! {
static ref TT_RECV_CON_BLOCK: Histogram = register_histogram!(histogram_opts!(
"tt_recv_con_block",
"Time to receive confirmed block from block subscribe",
))
.unwrap();
static ref TT_RECV_FIN_BLOCK: Histogram = register_histogram!(histogram_opts!(
"tt_recv_fin_block",
"Time to receive finalized block from block subscribe",
))
.unwrap();
static ref FIN_BLOCKS_RECV: Counter =
register_counter!(opts!("fin_blocks_recv", "Number of Finalized Blocks Received")).unwrap();
static ref CON_BLOCKS_RECV: Counter =
register_counter!(opts!("con_blocks_recv", "Number of Confirmed Blocks Received")).unwrap();
static ref INCOMPLETE_FIN_BLOCKS_RECV: Counter =
register_counter!(opts!("incomplete_fin_blocks_recv", "Number of Incomplete Finalized Blocks Received")).unwrap();
static ref INCOMPLETE_CON_BLOCKS_RECV: Counter =
register_counter!(opts!("incomplete_con_blocks_recv", "Number of Incomplete Confirmed Blocks Received")).unwrap();
static ref TXS_CONFIRMED: Counter =
register_counter!(opts!("txs_confirmed", "Number of Transactions Confirmed")).unwrap();
static ref TXS_FINALIZED: Counter =
register_counter!(opts!("txs_finalized", "Number of Transactions Finalized")).unwrap();
}
/// Background worker which listen's to new blocks /// Background worker which listen's to new blocks
/// and keeps a track of confirmed txs /// and keeps a track of confirmed txs
#[derive(Clone)] #[derive(Clone)]
pub struct BlockListener { pub struct BlockListener {
pub_sub_client: Arc<PubsubClient>,
commitment_config: CommitmentConfig,
tx_sender: TxSender, tx_sender: TxSender,
block_store: Arc<DashMap<String, BlockInformation>>, block_store: BlockStore,
latest_block_hash: Arc<RwLock<String>>,
pub signature_subscribers: Arc<DashMap<String, SubscriptionSink>>, pub signature_subscribers: Arc<DashMap<String, SubscriptionSink>>,
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct BlockInformation { pub struct BlockInformation {
pub slot: u64, pub slot: u64,
pub block_height: u64, pub block_height: u64,
@ -50,33 +74,12 @@ pub struct BlockListnerNotificatons {
} }
impl BlockListener { impl BlockListener {
pub async fn new( pub fn new(tx_sender: TxSender, block_store: BlockStore) -> Self {
pub_sub_client: Arc<PubsubClient>, Self {
rpc_client: Arc<RpcClient>,
tx_sender: TxSender,
commitment_config: CommitmentConfig,
) -> anyhow::Result<Self> {
let (latest_block_hash, block_height) = rpc_client
.get_latest_blockhash_with_commitment(commitment_config)
.await?;
let latest_block_hash = latest_block_hash.to_string();
let slot = rpc_client
.get_slot_with_commitment(commitment_config)
.await?;
Ok(Self {
pub_sub_client,
tx_sender, tx_sender,
latest_block_hash: Arc::new(RwLock::new(latest_block_hash.clone())), block_store,
block_store: Arc::new({
let map = DashMap::new();
map.insert(latest_block_hash, BlockInformation { slot, block_height });
map
}),
commitment_config,
signature_subscribers: Default::default(), signature_subscribers: Default::default(),
}) }
} }
pub async fn num_of_sigs_commited(&self, sigs: &[String]) -> usize { pub async fn num_of_sigs_commited(&self, sigs: &[String]) -> usize {
@ -89,31 +92,6 @@ impl BlockListener {
num_of_sigs_commited num_of_sigs_commited
} }
pub async fn get_latest_block_info(&self) -> (String, BlockInformation) {
let blockhash = &*self.latest_block_hash.read().await;
(
blockhash.to_owned(),
self.block_store
.get(blockhash)
.expect("Internal Error: Block store race condition")
.value()
.to_owned(),
)
}
pub async fn get_block_info(&self, blockhash: &str) -> Option<BlockInformation> {
let Some(info) = self.block_store.get(blockhash) else {
return None;
};
Some(info.value().to_owned())
}
pub async fn get_latest_blockhash(&self) -> String {
self.latest_block_hash.read().await.to_owned()
}
pub fn signature_subscribe(&self, signature: String, sink: SubscriptionSink) { pub fn signature_subscribe(&self, signature: String, sink: SubscriptionSink) {
let _ = self.signature_subscribers.insert(signature, sink); let _ = self.signature_subscribers.insert(signature, sink);
} }
@ -122,9 +100,21 @@ impl BlockListener {
self.signature_subscribers.remove(&signature); self.signature_subscribers.remove(&signature);
} }
pub fn listen(self, postgres: Option<PostgresMpscSend>) -> JoinHandle<anyhow::Result<()>> { fn increment_invalid_block_metric(commitment_config: CommitmentConfig) {
tokio::spawn(async move { if commitment_config.is_finalized() {
let commitment = self.commitment_config.commitment; INCOMPLETE_FIN_BLOCKS_RECV.inc();
} else {
INCOMPLETE_CON_BLOCKS_RECV.inc();
}
}
pub async fn listen_from_pubsub(
self,
pubsub_client: &PubsubClient,
commitment_config: CommitmentConfig,
postgres: &Option<PostgresMpscSend>,
) -> anyhow::Result<()> {
let commitment = commitment_config.commitment;
let comfirmation_status = match commitment { let comfirmation_status = match commitment {
CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized, CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized,
@ -133,16 +123,13 @@ impl BlockListener {
info!("Subscribing to {commitment:?} blocks"); info!("Subscribing to {commitment:?} blocks");
let (mut recv, _) = self let (mut recv, _) = pubsub_client
.pub_sub_client
.block_subscribe( .block_subscribe(
RpcBlockSubscribeFilter::All, RpcBlockSubscribeFilter::All,
Some(RpcBlockSubscribeConfig { Some(RpcBlockSubscribeConfig {
commitment: Some(self.commitment_config), commitment: Some(commitment_config),
encoding: None, encoding: None,
transaction_details: Some( transaction_details: Some(solana_transaction_status::TransactionDetails::Full),
solana_transaction_status::TransactionDetails::Full,
),
show_rewards: None, show_rewards: None,
max_supported_transaction_version: None, max_supported_transaction_version: None,
}), }),
@ -152,33 +139,66 @@ impl BlockListener {
info!("Listening to {commitment:?} blocks"); info!("Listening to {commitment:?} blocks");
while let Some(block) = recv.as_mut().next().await { loop {
let slot = block.value.slot; let timer = if commitment_config.is_finalized() {
TT_RECV_FIN_BLOCK.start_timer()
} else {
TT_RECV_CON_BLOCK.start_timer()
};
let Some(block) = recv.as_mut().next().await else {
bail!("PubSub broke");
};
timer.observe_duration();
if commitment_config.is_finalized() {
FIN_BLOCKS_RECV.inc();
} else {
CON_BLOCKS_RECV.inc();
};
let slot = block.context.slot;
let Some(block) = block.value.block else { let Some(block) = block.value.block else {
Self::increment_invalid_block_metric(commitment_config);
continue; continue;
}; };
let Some(block_height) = block.block_height else { let Some(block_height) = block.block_height else {
Self::increment_invalid_block_metric(commitment_config);
continue;
};
let Some(transactions) = block.transactions else {
Self::increment_invalid_block_metric(commitment_config);
continue; continue;
}; };
let blockhash = block.blockhash; let blockhash = block.blockhash;
let parent_slot = block.parent_slot;
let Some(transactions) = block.transactions else { self.block_store
.add_block(
blockhash.clone(),
BlockInformation { slot, block_height },
commitment_config,
)
.await;
if let Some(postgres) = &postgres {
let Some(rewards) = block.rewards else {
continue; continue;
}; };
let parent_slot = block.parent_slot; let Some(leader_reward) = rewards
.iter()
.find(|reward| Some(RewardType::Fee) == reward.reward_type) else {
continue;
};
// Write to block store first in order to prevent let _leader_id = &leader_reward.pubkey;
// any race condition i.e prevent some one to
// ask the map what it doesn't have rn
self.block_store
.insert(blockhash.clone(), BlockInformation { slot, block_height });
*self.latest_block_hash.write().await = blockhash;
if let Some(postgres) = &postgres {
postgres postgres
.send(PostgresMsg::PostgresBlock(PostgresBlock { .send(PostgresMsg::PostgresBlock(PostgresBlock {
slot: slot as i64, slot: slot as i64,
@ -202,6 +222,17 @@ impl BlockListener {
let sig = tx.get_signature().to_string(); let sig = tx.get_signature().to_string();
if let Some(mut tx_status) = self.tx_sender.txs_sent.get_mut(&sig) { if let Some(mut tx_status) = self.tx_sender.txs_sent.get_mut(&sig) {
//
// Metrics
//
if status.is_ok() {
if commitment_config.is_finalized() {
TXS_FINALIZED.inc();
} else {
TXS_CONFIRMED.inc();
}
}
tx_status.value_mut().status = Some(TransactionStatus { tx_status.value_mut().status = Some(TransactionStatus {
slot, slot,
confirmations: None, confirmations: None,
@ -245,8 +276,24 @@ impl BlockListener {
} }
} }
} }
}
bail!("Stopped Listening to {commitment:?} blocks") pub fn listen(
self,
commitment_config: CommitmentConfig,
postgres: Option<PostgresMpscSend>,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
loop {
let ws_addr = &self.tx_sender.tpu_manager.ws_addr;
let pub_sub_client = PubsubClient::new(ws_addr).await?;
let err = self
.clone()
.listen_from_pubsub(&pub_sub_client, commitment_config, &postgres)
.await
.unwrap_err();
warn!("{commitment_config:?} Block Subscribe error {err}");
}
}) })
} }
} }

View File

@ -7,16 +7,16 @@ use super::{BlockListener, TxSender};
/// Background worker which cleans up memory /// Background worker which cleans up memory
#[derive(Clone)] #[derive(Clone)]
pub struct Cleaner<const N: usize> { pub struct Cleaner {
tx_sender: TxSender, tx_sender: TxSender,
block_listeners: [BlockListener; N], block_listenser: BlockListener,
} }
impl<const N: usize> Cleaner<N> { impl Cleaner {
pub fn new(tx_sender: TxSender, block_listeners: [BlockListener; N]) -> Self { pub fn new(tx_sender: TxSender, block_listenser: BlockListener) -> Self {
Self { Self {
tx_sender, tx_sender,
block_listeners, block_listenser,
} }
} }
@ -38,22 +38,20 @@ impl<const N: usize> Cleaner<N> {
/// Clean Signature Subscribers from Block Listeners /// Clean Signature Subscribers from Block Listeners
pub fn clean_block_listeners(&self) { pub fn clean_block_listeners(&self) {
for block_listenser in &self.block_listeners {
let mut to_remove = vec![]; let mut to_remove = vec![];
for subscriber in block_listenser.signature_subscribers.iter() { for subscriber in self.block_listenser.signature_subscribers.iter() {
if subscriber.value().is_closed() { if subscriber.value().is_closed() {
to_remove.push(subscriber.key().to_owned()); to_remove.push(subscriber.key().to_owned());
} }
} }
for to_remove in &to_remove { for to_remove in &to_remove {
block_listenser.signature_subscribers.remove(to_remove); self.block_listenser.signature_subscribers.remove(to_remove);
} }
info!("Cleaned {} Signature Subscribers", to_remove.len()); info!("Cleaned {} Signature Subscribers", to_remove.len());
} }
}
pub fn start(self, ttl_duration: Duration) -> JoinHandle<anyhow::Result<()>> { pub fn start(self, ttl_duration: Duration) -> JoinHandle<anyhow::Result<()>> {
let mut ttl = tokio::time::interval(ttl_duration); let mut ttl = tokio::time::interval(ttl_duration);

View File

@ -2,12 +2,12 @@ mod block_listenser;
mod cleaner; mod cleaner;
mod metrics_capture; mod metrics_capture;
mod postgres; mod postgres;
mod prometheus; mod prometheus_sync;
mod tx_sender; mod tx_sender;
pub use block_listenser::*; pub use block_listenser::*;
pub use cleaner::*; pub use cleaner::*;
pub use metrics_capture::*; pub use metrics_capture::*;
pub use postgres::*; pub use postgres::*;
pub use prometheus::*; pub use prometheus_sync::*;
pub use tx_sender::*; pub use tx_sender::*;

View File

@ -1,23 +1,13 @@
use std::collections::HashMap; use prometheus::{Encoder, TextEncoder};
use tokio::{ use tokio::{
io::AsyncWriteExt, io::AsyncWriteExt,
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream, ToSocketAddrs},
task::JoinHandle, task::JoinHandle,
}; };
use super::MetricsCapture; pub struct PrometheusSync;
#[derive(Clone)]
pub struct PrometheusSync {
metrics_capture: MetricsCapture,
}
impl PrometheusSync { impl PrometheusSync {
pub fn new(metrics_capture: MetricsCapture) -> Self {
Self { metrics_capture }
}
fn create_response(payload: &str) -> String { fn create_response(payload: &str) -> String {
format!( format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}", "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
@ -27,10 +17,16 @@ impl PrometheusSync {
} }
async fn handle_stream(&self, stream: &mut TcpStream) -> anyhow::Result<()> { async fn handle_stream(&self, stream: &mut TcpStream) -> anyhow::Result<()> {
let metrics = self.metrics_capture.get_metrics().await; let mut metrics_buffer = Vec::new();
let metrics = serde_prometheus::to_string(&metrics, Some("literpc"), HashMap::new())?; let encoder = TextEncoder::new();
let response = Self::create_response(&metrics); let metric_families = prometheus::gather();
encoder
.encode(&metric_families, &mut metrics_buffer)
.unwrap();
let metrics_buffer = String::from_utf8(metrics_buffer).unwrap();
let response = Self::create_response(&metrics_buffer);
stream.writable().await?; stream.writable().await?;
stream.write_all(response.as_bytes()).await?; stream.write_all(response.as_bytes()).await?;
@ -40,10 +36,10 @@ impl PrometheusSync {
Ok(()) Ok(())
} }
pub fn sync(self) -> JoinHandle<anyhow::Result<()>> { pub fn sync(self, addr: impl ToSocketAddrs + Send + 'static) -> JoinHandle<anyhow::Result<()>> {
#[allow(unreachable_code)] #[allow(unreachable_code)]
tokio::spawn(async move { tokio::spawn(async move {
let listener = TcpListener::bind("[::]:9091").await?; let listener = TcpListener::bind(addr).await?;
loop { loop {
let Ok((mut stream, _addr)) = listener.accept().await else { let Ok((mut stream, _addr)) = listener.accept().await else {

View File

@ -7,6 +7,7 @@ use anyhow::bail;
use dashmap::DashMap; use dashmap::DashMap;
use log::{info, warn}; use log::{info, warn};
use prometheus::{register_counter, Counter};
use solana_transaction_status::TransactionStatus; use solana_transaction_status::TransactionStatus;
use tokio::{ use tokio::{
sync::mpsc::{error::TryRecvError, UnboundedReceiver}, sync::mpsc::{error::TryRecvError, UnboundedReceiver},
@ -20,6 +21,11 @@ use crate::{
use super::PostgresMpscSend; use super::PostgresMpscSend;
lazy_static::lazy_static! {
static ref TXS_SENT: Counter =
register_counter!("txs_sent", "Number of transactions forwarded to tpu").unwrap();
}
pub type WireTransaction = Vec<u8>; pub type WireTransaction = Vec<u8>;
/// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions /// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions
@ -28,7 +34,7 @@ pub struct TxSender {
/// Tx(s) forwarded to tpu /// Tx(s) forwarded to tpu
pub txs_sent: Arc<DashMap<String, TxProps>>, pub txs_sent: Arc<DashMap<String, TxProps>>,
/// TpuClient to call the tpu port /// TpuClient to call the tpu port
tpu_manager: Arc<TpuManager>, pub tpu_manager: Arc<TpuManager>,
} }
/// Transaction Properties /// Transaction Properties
@ -77,6 +83,9 @@ impl TxSender {
for (sig, _) in &sigs_and_slots { for (sig, _) in &sigs_and_slots {
txs_sent.insert(sig.to_owned(), TxProps::default()); txs_sent.insert(sig.to_owned(), TxProps::default());
} }
// metrics
TXS_SENT.inc_by(sigs_and_slots.len() as f64);
1 1
} }
Err(err) => { Err(err) => {
@ -86,15 +95,17 @@ impl TxSender {
}; };
if let Some(postgres) = postgres { if let Some(postgres) = postgres {
let forwarded_slot = tpu_client.get_tpu_client().await.estimated_current_slot();
for (sig, recent_slot) in sigs_and_slots { for (sig, recent_slot) in sigs_and_slots {
postgres postgres
.send(PostgresMsg::PostgresTx(PostgresTx { .send(PostgresMsg::PostgresTx(PostgresTx {
signature: sig.clone(), signature: sig.clone(),
recent_slot: recent_slot as i64, recent_slot: recent_slot as i64,
forwarded_slot: 0, // FIX: figure this out forwarded_slot: forwarded_slot as i64,
processed_slot: None, // FIX: figure this out processed_slot: None,
cu_consumed: None, // FIX: figure this out cu_consumed: None,
cu_requested: None, // FIX: figure this out cu_requested: None,
quic_response, quic_response,
})) }))
.expect("Error writing to postgres service"); .expect("Error writing to postgres service");

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use bench::helpers::BenchHelper; use bench::helpers::BenchHelper;
use lite_rpc::DEFAULT_LITE_RPC_ADDR; use lite_rpc::DEFAULT_LITE_RPC_ADDR;
use log::info; use log::info;
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction}; use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::commitment_config::CommitmentConfig;
const AMOUNT: usize = 5; const AMOUNT: usize = 5;

34
tests/diff.rs Normal file
View File

@ -0,0 +1,34 @@
use lite_rpc::{DEFAULT_LITE_RPC_ADDR, DEFAULT_RPC_ADDR};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
#[tokio::test]
async fn diff_rpc() -> anyhow::Result<()> {
let rpc_client = RpcClient::new(DEFAULT_RPC_ADDR.to_string());
let lite_rpc_client = RpcClient::new(DEFAULT_LITE_RPC_ADDR.to_string());
check_block_hash(&rpc_client, &lite_rpc_client, CommitmentConfig::confirmed()).await?;
check_block_hash(&rpc_client, &lite_rpc_client, CommitmentConfig::finalized()).await?;
Ok(())
}
async fn check_block_hash(
rpc_client: &RpcClient,
lite_rpc_client: &RpcClient,
commitment_config: CommitmentConfig,
) -> anyhow::Result<()> {
let rpc_blockhash = rpc_client
.get_latest_blockhash_with_commitment(commitment_config)
.await?;
let lite_blockhash = lite_rpc_client
.get_latest_blockhash_with_commitment(commitment_config)
.await?;
println!("{commitment_config:?} {rpc_blockhash:?} {lite_blockhash:?}");
assert_eq!(rpc_blockhash.0, lite_blockhash.0);
assert_eq!(rpc_blockhash.1, lite_blockhash.1);
Ok(())
}

View File

@ -3,13 +3,14 @@ use std::{sync::Arc, time::Duration};
use bench::helpers::BenchHelper; use bench::helpers::BenchHelper;
use futures::future::try_join_all; use futures::future::try_join_all;
use lite_rpc::{ use lite_rpc::{
block_store::BlockStore,
encoding::BinaryEncoding, encoding::BinaryEncoding,
tpu_manager::TpuManager, tpu_manager::TpuManager,
workers::{BlockListener, TxSender}, workers::{BlockListener, TxSender},
DEFAULT_LITE_RPC_ADDR, DEFAULT_RPC_ADDR, DEFAULT_TX_BATCH_INTERVAL_MS, DEFAULT_TX_BATCH_SIZE, DEFAULT_LITE_RPC_ADDR, DEFAULT_RPC_ADDR, DEFAULT_TX_BATCH_INTERVAL_MS, DEFAULT_TX_BATCH_SIZE,
DEFAULT_WS_ADDR, DEFAULT_WS_ADDR,
}; };
use solana_client::nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient}; use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::TransactionConfirmationStatus; use solana_transaction_status::TransactionConfirmationStatus;
@ -31,23 +32,17 @@ async fn send_and_confirm_txs() {
.unwrap(), .unwrap(),
); );
let pub_sub_client = Arc::new(PubsubClient::new(DEFAULT_WS_ADDR).await.unwrap());
let tx_sender = TxSender::new(tpu_client); let tx_sender = TxSender::new(tpu_client);
let block_store = BlockStore::new(&rpc_client).await.unwrap();
let block_listener = BlockListener::new( let block_listener = BlockListener::new(tx_sender.clone(), block_store);
pub_sub_client.clone(),
rpc_client.clone(),
tx_sender.clone(),
CommitmentConfig::confirmed(),
)
.await
.unwrap();
let (tx_send, tx_recv) = mpsc::unbounded_channel(); let (tx_send, tx_recv) = mpsc::unbounded_channel();
let services = try_join_all(vec![ let services = try_join_all(vec![
block_listener.clone().listen(None), block_listener
.clone()
.listen(CommitmentConfig::confirmed(), None),
tx_sender.clone().execute( tx_sender.clone().execute(
tx_recv, tx_recv,
DEFAULT_TX_BATCH_SIZE, DEFAULT_TX_BATCH_SIZE,