reset pub_sub client
This commit is contained in:
parent
b890ee4052
commit
a20d23e924
|
@ -18,16 +18,13 @@ use log::info;
|
|||
|
||||
use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink};
|
||||
|
||||
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
|
||||
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
|
||||
use solana_rpc_client_api::{
|
||||
config::{RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig},
|
||||
response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo},
|
||||
};
|
||||
use solana_sdk::{
|
||||
commitment_config::{CommitmentConfig},
|
||||
hash::Hash,
|
||||
pubkey::Pubkey,
|
||||
commitment_config::CommitmentConfig, hash::Hash, pubkey::Pubkey,
|
||||
transaction::VersionedTransaction,
|
||||
};
|
||||
use solana_transaction_status::TransactionStatus;
|
||||
|
@ -51,7 +48,6 @@ pub struct LiteBridge {
|
|||
impl LiteBridge {
|
||||
pub async fn new(rpc_url: String, ws_addr: String, fanout_slots: u64) -> anyhow::Result<Self> {
|
||||
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
|
||||
let pub_sub_client = Arc::new(PubsubClient::new(&ws_addr).await?);
|
||||
|
||||
let tpu_manager =
|
||||
Arc::new(TpuManager::new(rpc_client.clone(), ws_addr, fanout_slots).await?);
|
||||
|
@ -60,11 +56,7 @@ impl LiteBridge {
|
|||
|
||||
let block_store = BlockStore::new(&rpc_client).await?;
|
||||
|
||||
let block_listner = BlockListener::new(
|
||||
pub_sub_client.clone(),
|
||||
tx_sender.clone(),
|
||||
block_store.clone(),
|
||||
);
|
||||
let block_listner = BlockListener::new(tx_sender.clone(), block_store.clone());
|
||||
|
||||
Ok(Self {
|
||||
rpc_client,
|
||||
|
|
|
@ -4,7 +4,7 @@ use anyhow::{bail, Context};
|
|||
use dashmap::DashMap;
|
||||
use futures::StreamExt;
|
||||
use jsonrpsee::SubscriptionSink;
|
||||
use log::info;
|
||||
use log::{info, warn};
|
||||
use prometheus::{histogram_opts, opts, register_counter, register_histogram, Counter, Histogram};
|
||||
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
|
||||
use solana_rpc_client::rpc_client::SerializableTransaction;
|
||||
|
@ -49,7 +49,6 @@ lazy_static::lazy_static! {
|
|||
/// and keeps a track of confirmed txs
|
||||
#[derive(Clone)]
|
||||
pub struct BlockListener {
|
||||
pub_sub_client: Arc<PubsubClient>,
|
||||
tx_sender: TxSender,
|
||||
block_store: BlockStore,
|
||||
pub signature_subscribers: Arc<DashMap<String, SubscriptionSink>>,
|
||||
|
@ -67,13 +66,8 @@ pub struct BlockListnerNotificatons {
|
|||
}
|
||||
|
||||
impl BlockListener {
|
||||
pub fn new(
|
||||
pub_sub_client: Arc<PubsubClient>,
|
||||
tx_sender: TxSender,
|
||||
block_store: BlockStore,
|
||||
) -> Self {
|
||||
pub fn new(tx_sender: TxSender, block_store: BlockStore) -> Self {
|
||||
Self {
|
||||
pub_sub_client,
|
||||
tx_sender,
|
||||
block_store,
|
||||
signature_subscribers: Default::default(),
|
||||
|
@ -98,168 +92,183 @@ impl BlockListener {
|
|||
self.signature_subscribers.remove(&signature);
|
||||
}
|
||||
|
||||
pub async fn listen_from_pubsub(
|
||||
self,
|
||||
pubsub_client: &PubsubClient,
|
||||
commitment_config: CommitmentConfig,
|
||||
postgres: &Option<PostgresMpscSend>,
|
||||
) -> anyhow::Result<()> {
|
||||
let commitment = commitment_config.commitment;
|
||||
|
||||
let comfirmation_status = match commitment {
|
||||
CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized,
|
||||
_ => TransactionConfirmationStatus::Confirmed,
|
||||
};
|
||||
|
||||
info!("Subscribing to {commitment:?} blocks");
|
||||
|
||||
let (mut recv, _) = pubsub_client
|
||||
.block_subscribe(
|
||||
RpcBlockSubscribeFilter::All,
|
||||
Some(RpcBlockSubscribeConfig {
|
||||
commitment: Some(commitment_config),
|
||||
encoding: None,
|
||||
transaction_details: Some(solana_transaction_status::TransactionDetails::Full),
|
||||
show_rewards: None,
|
||||
max_supported_transaction_version: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.context("Error calling block_subscribe")?;
|
||||
|
||||
info!("Listening to {commitment:?} blocks");
|
||||
|
||||
loop {
|
||||
let timer = if commitment_config.is_finalized() {
|
||||
TT_RECV_FIN_BLOCK.start_timer()
|
||||
} else {
|
||||
TT_RECV_CON_BLOCK.start_timer()
|
||||
};
|
||||
|
||||
let Some(block) = recv.as_mut().next().await else {
|
||||
bail!("PubSub broke");
|
||||
};
|
||||
|
||||
timer.observe_duration();
|
||||
|
||||
let slot = block.context.slot;
|
||||
|
||||
let Some(block) = block.value.block else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(block_height) = block.block_height else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let blockhash = block.blockhash;
|
||||
|
||||
let Some(transactions) = block.transactions else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let parent_slot = block.parent_slot;
|
||||
|
||||
self.block_store
|
||||
.add_block(
|
||||
blockhash.clone(),
|
||||
BlockInformation { slot, block_height },
|
||||
commitment_config,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(postgres) = &postgres {
|
||||
let Some(rewards) = block.rewards else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(leader_reward) = rewards
|
||||
.iter()
|
||||
.find(|reward| Some(RewardType::Fee) == reward.reward_type) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let _leader_id = &leader_reward.pubkey;
|
||||
|
||||
postgres
|
||||
.send(PostgresMsg::PostgresBlock(PostgresBlock {
|
||||
slot: slot as i64,
|
||||
leader_id: 0, //FIX:
|
||||
parent_slot: parent_slot as i64,
|
||||
}))
|
||||
.expect("Error sending block to postgres service");
|
||||
}
|
||||
|
||||
for tx in transactions {
|
||||
let Some(UiTransactionStatusMeta { err, status, compute_units_consumed ,.. }) = tx.meta else {
|
||||
info!("tx with no meta");
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(tx) = tx.transaction.decode() else {
|
||||
info!("unable to decode tx");
|
||||
continue;
|
||||
};
|
||||
|
||||
let sig = tx.get_signature().to_string();
|
||||
|
||||
if let Some(mut tx_status) = self.tx_sender.txs_sent.get_mut(&sig) {
|
||||
//
|
||||
// Metrics
|
||||
//
|
||||
if status.is_ok() {
|
||||
if commitment_config.is_finalized() {
|
||||
TXS_FINALIZED.inc();
|
||||
} else {
|
||||
TXS_CONFIRMED.inc();
|
||||
}
|
||||
}
|
||||
|
||||
tx_status.value_mut().status = Some(TransactionStatus {
|
||||
slot,
|
||||
confirmations: None,
|
||||
status,
|
||||
err: err.clone(),
|
||||
confirmation_status: Some(comfirmation_status.clone()),
|
||||
});
|
||||
|
||||
//
|
||||
// Write to postgres
|
||||
//
|
||||
if let Some(postgres) = &postgres {
|
||||
let cu_consumed = match compute_units_consumed {
|
||||
OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
postgres
|
||||
.send(PostgresMsg::PostgresUpdateTx(
|
||||
PostgresUpdateTx {
|
||||
processed_slot: slot as i64,
|
||||
cu_consumed,
|
||||
cu_requested: None, //TODO: cu requested
|
||||
},
|
||||
sig.clone(),
|
||||
))
|
||||
.unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
// subscribers
|
||||
if let Some((_sig, mut sink)) = self.signature_subscribers.remove(&sig) {
|
||||
// none if transaction succeeded
|
||||
sink.send(&RpcResponse {
|
||||
context: RpcResponseContext {
|
||||
slot,
|
||||
api_version: None,
|
||||
},
|
||||
value: serde_json::json!({ "err": err }),
|
||||
})?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn listen(
|
||||
self,
|
||||
commitment_config: CommitmentConfig,
|
||||
postgres: Option<PostgresMpscSend>,
|
||||
) -> JoinHandle<anyhow::Result<()>> {
|
||||
tokio::spawn(async move {
|
||||
let commitment = commitment_config.commitment;
|
||||
|
||||
let comfirmation_status = match commitment {
|
||||
CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized,
|
||||
_ => TransactionConfirmationStatus::Confirmed,
|
||||
};
|
||||
|
||||
info!("Subscribing to {commitment:?} blocks");
|
||||
|
||||
let (mut recv, _) = self
|
||||
.pub_sub_client
|
||||
.block_subscribe(
|
||||
RpcBlockSubscribeFilter::All,
|
||||
Some(RpcBlockSubscribeConfig {
|
||||
commitment: Some(commitment_config),
|
||||
encoding: None,
|
||||
transaction_details: Some(
|
||||
solana_transaction_status::TransactionDetails::Full,
|
||||
),
|
||||
show_rewards: None,
|
||||
max_supported_transaction_version: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.context("Error calling block_subscribe")?;
|
||||
|
||||
info!("Listening to {commitment:?} blocks");
|
||||
|
||||
loop {
|
||||
let timer = if commitment_config.is_finalized() {
|
||||
TT_RECV_FIN_BLOCK.start_timer()
|
||||
} else {
|
||||
TT_RECV_CON_BLOCK.start_timer()
|
||||
};
|
||||
|
||||
let Some(block) = recv.as_mut().next().await else {
|
||||
bail!("PubSub broke");
|
||||
};
|
||||
|
||||
timer.observe_duration();
|
||||
|
||||
let slot = block.context.slot;
|
||||
|
||||
let Some(block) = block.value.block else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(block_height) = block.block_height else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let blockhash = block.blockhash;
|
||||
|
||||
let Some(transactions) = block.transactions else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let parent_slot = block.parent_slot;
|
||||
|
||||
self.block_store
|
||||
.add_block(
|
||||
blockhash.clone(),
|
||||
BlockInformation { slot, block_height },
|
||||
commitment_config,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(postgres) = &postgres {
|
||||
let Some(rewards) = block.rewards else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(leader_reward) = rewards
|
||||
.iter()
|
||||
.find(|reward| Some(RewardType::Fee) == reward.reward_type) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let _leader_id = &leader_reward.pubkey;
|
||||
|
||||
postgres
|
||||
.send(PostgresMsg::PostgresBlock(PostgresBlock {
|
||||
slot: slot as i64,
|
||||
leader_id: 0, //FIX:
|
||||
parent_slot: parent_slot as i64,
|
||||
}))
|
||||
.expect("Error sending block to postgres service");
|
||||
}
|
||||
|
||||
for tx in transactions {
|
||||
let Some(UiTransactionStatusMeta { err, status, compute_units_consumed ,.. }) = tx.meta else {
|
||||
info!("tx with no meta");
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(tx) = tx.transaction.decode() else {
|
||||
info!("unable to decode tx");
|
||||
continue;
|
||||
};
|
||||
|
||||
let sig = tx.get_signature().to_string();
|
||||
|
||||
if let Some(mut tx_status) = self.tx_sender.txs_sent.get_mut(&sig) {
|
||||
//
|
||||
// Metrics
|
||||
//
|
||||
if status.is_ok() {
|
||||
if commitment_config.is_finalized() {
|
||||
TXS_FINALIZED.inc();
|
||||
} else {
|
||||
TXS_CONFIRMED.inc();
|
||||
}
|
||||
}
|
||||
|
||||
tx_status.value_mut().status = Some(TransactionStatus {
|
||||
slot,
|
||||
confirmations: None,
|
||||
status,
|
||||
err: err.clone(),
|
||||
confirmation_status: Some(comfirmation_status.clone()),
|
||||
});
|
||||
|
||||
//
|
||||
// Write to postgres
|
||||
//
|
||||
if let Some(postgres) = &postgres {
|
||||
let cu_consumed = match compute_units_consumed {
|
||||
OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
postgres
|
||||
.send(PostgresMsg::PostgresUpdateTx(
|
||||
PostgresUpdateTx {
|
||||
processed_slot: slot as i64,
|
||||
cu_consumed,
|
||||
cu_requested: None, //TODO: cu requested
|
||||
},
|
||||
sig.clone(),
|
||||
))
|
||||
.unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
// subscribers
|
||||
if let Some((_sig, mut sink)) = self.signature_subscribers.remove(&sig) {
|
||||
// none if transaction succeeded
|
||||
sink.send(&RpcResponse {
|
||||
context: RpcResponseContext {
|
||||
slot,
|
||||
api_version: None,
|
||||
},
|
||||
value: serde_json::json!({ "err": err }),
|
||||
})?;
|
||||
}
|
||||
}
|
||||
let ws_addr = &self.tx_sender.tpu_manager.ws_addr;
|
||||
let pub_sub_client = PubsubClient::new(ws_addr).await?;
|
||||
let err = self
|
||||
.clone()
|
||||
.listen_from_pubsub(&pub_sub_client, commitment_config, &postgres)
|
||||
.await
|
||||
.unwrap_err();
|
||||
warn!("{commitment_config:?} Block Subscribe error {err}");
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ pub struct TxSender {
|
|||
/// Tx(s) forwarded to tpu
|
||||
pub txs_sent: Arc<DashMap<String, TxProps>>,
|
||||
/// TpuClient to call the tpu port
|
||||
tpu_manager: Arc<TpuManager>,
|
||||
pub tpu_manager: Arc<TpuManager>,
|
||||
}
|
||||
|
||||
/// Transaction Properties
|
||||
|
|
|
@ -10,7 +10,6 @@ use lite_rpc::{
|
|||
DEFAULT_LITE_RPC_ADDR, DEFAULT_RPC_ADDR, DEFAULT_TX_BATCH_INTERVAL_MS, DEFAULT_TX_BATCH_SIZE,
|
||||
DEFAULT_WS_ADDR,
|
||||
};
|
||||
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
|
||||
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
|
@ -33,12 +32,10 @@ async fn send_and_confirm_txs() {
|
|||
.unwrap(),
|
||||
);
|
||||
|
||||
let pub_sub_client = Arc::new(PubsubClient::new(DEFAULT_WS_ADDR).await.unwrap());
|
||||
|
||||
let tx_sender = TxSender::new(tpu_client);
|
||||
let block_store = BlockStore::new(&rpc_client).await.unwrap();
|
||||
|
||||
let block_listener = BlockListener::new(pub_sub_client.clone(), tx_sender.clone(), block_store);
|
||||
let block_listener = BlockListener::new(tx_sender.clone(), block_store);
|
||||
|
||||
let (tx_send, tx_recv) = mpsc::unbounded_channel();
|
||||
|
||||
|
|
Loading…
Reference in New Issue