This commit is contained in:
aniketfuryrocks 2023-02-07 20:04:50 +05:30 committed by Godmode Galactus
parent b2abb70b6f
commit db0e01506d
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
2 changed files with 174 additions and 158 deletions

View File

@ -81,7 +81,8 @@ impl LiteBridge {
let block_store = BlockStore::new(&rpc_client).await?;
let block_listner = BlockListener::new(tx_sender.clone(), block_store.clone());
let block_listner =
BlockListener::new(rpc_client.clone(), tx_sender.clone(), block_store.clone());
Ok(Self {
rpc_client,

View File

@ -1,23 +1,25 @@
use std::sync::Arc;
use anyhow::{bail, Context};
use dashmap::DashMap;
use futures::StreamExt;
use futures::future::try_join_all;
use jsonrpsee::SubscriptionSink;
use log::{info, warn};
use prometheus::{histogram_opts, opts, register_counter, register_histogram, Counter, Histogram};
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_rpc_client_api::{
config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter},
config::RpcBlockConfig,
response::{Response as RpcResponse, RpcResponseContext},
};
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
slot_history::Slot,
};
use solana_transaction_status::{
option_serializer::OptionSerializer, RewardType, TransactionConfirmationStatus,
TransactionStatus, UiConfirmedBlock, UiTransactionStatusMeta,
TransactionDetails, TransactionStatus, UiConfirmedBlock, UiTransactionStatusMeta,
};
use tokio::{sync::mpsc::Sender, task::JoinHandle};
@ -59,6 +61,7 @@ lazy_static::lazy_static! {
pub struct BlockListener {
tx_sender: TxSender,
block_store: BlockStore,
rpc_client: Arc<RpcClient>,
pub signature_subscribers: Arc<DashMap<String, SubscriptionSink>>,
}
@ -74,8 +77,9 @@ pub struct BlockListnerNotificatons {
}
impl BlockListener {
pub fn new(tx_sender: TxSender, block_store: BlockStore) -> Self {
pub fn new(rpc_client: Arc<RpcClient>, tx_sender: TxSender, block_store: BlockStore) -> Self {
Self {
rpc_client,
tx_sender,
block_store,
signature_subscribers: Default::default(),
@ -108,174 +112,156 @@ impl BlockListener {
}
}
pub async fn listen_from_pubsub(
self,
pubsub_client: &PubsubClient,
pub async fn index_slot(
&self,
slot: Slot,
commitment_config: CommitmentConfig,
postgres: &Option<PostgresMpscSend>,
postgres: Option<PostgresMpscSend>,
) -> anyhow::Result<()> {
let commitment = commitment_config.commitment;
let comfirmation_status = match commitment {
let comfirmation_status = match commitment_config.commitment {
CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized,
_ => TransactionConfirmationStatus::Confirmed,
};
info!("Subscribing to {commitment:?} blocks");
let timer = if commitment_config.is_finalized() {
TT_RECV_FIN_BLOCK.start_timer()
} else {
TT_RECV_CON_BLOCK.start_timer()
};
let (mut recv, _) = pubsub_client
.block_subscribe(
RpcBlockSubscribeFilter::All,
Some(RpcBlockSubscribeConfig {
let block = self
.rpc_client
.get_block_with_config(
slot,
RpcBlockConfig {
transaction_details: Some(TransactionDetails::Full),
commitment: Some(commitment_config),
encoding: None,
transaction_details: Some(solana_transaction_status::TransactionDetails::Full),
show_rewards: None,
max_supported_transaction_version: None,
}),
..Default::default()
},
)
.await
.context("Error calling block_subscribe")?;
.await?;
info!("Listening to {commitment:?} blocks");
timer.observe_duration();
loop {
let timer = if commitment_config.is_finalized() {
TT_RECV_FIN_BLOCK.start_timer()
} else {
TT_RECV_CON_BLOCK.start_timer()
};
if commitment_config.is_finalized() {
FIN_BLOCKS_RECV.inc();
} else {
CON_BLOCKS_RECV.inc();
};
let Some(block) = recv.as_mut().next().await else {
bail!("PubSub broke");
};
let Some(block_height) = block.block_height else {
Self::increment_invalid_block_metric(commitment_config);
return Ok(());
};
timer.observe_duration();
if commitment_config.is_finalized() {
FIN_BLOCKS_RECV.inc();
} else {
CON_BLOCKS_RECV.inc();
};
let slot = block.context.slot;
let Some(block) = block.value.block else {
let Some(transactions) = block.transactions else {
Self::increment_invalid_block_metric(commitment_config);
continue;
};
return Ok(());
};
let Some(block_height) = block.block_height else {
Self::increment_invalid_block_metric(commitment_config);
continue;
};
let blockhash = block.blockhash;
let parent_slot = block.parent_slot;
let Some(transactions) = block.transactions else {
Self::increment_invalid_block_metric(commitment_config);
continue;
};
self.block_store
.add_block(
blockhash.clone(),
BlockInformation { slot, block_height },
commitment_config,
)
.await;
let blockhash = block.blockhash;
let parent_slot = block.parent_slot;
if let Some(postgres) = &postgres {
let Some(rewards) = block.rewards else {
return Ok(());
};
self.block_store
.add_block(
blockhash.clone(),
BlockInformation { slot, block_height },
commitment_config,
)
.await;
let Some(leader_reward) = rewards
.iter()
.find(|reward| Some(RewardType::Fee) == reward.reward_type) else {
return Ok(());
if let Some(postgres) = &postgres {
let Some(rewards) = block.rewards else {
};
let _leader_id = &leader_reward.pubkey;
postgres
.send(PostgresMsg::PostgresBlock(PostgresBlock {
slot: slot as i64,
leader_id: 0, //FIX:
parent_slot: parent_slot as i64,
}))
.expect("Error sending block to postgres service");
}
for tx in transactions {
let Some(UiTransactionStatusMeta { err, status, compute_units_consumed ,.. }) = tx.meta else {
info!("tx with no meta");
continue;
};
let Some(tx) = tx.transaction.decode() else {
info!("unable to decode tx");
continue;
};
let Some(leader_reward) = rewards
.iter()
.find(|reward| Some(RewardType::Fee) == reward.reward_type) else {
continue;
};
let sig = tx.get_signature().to_string();
let _leader_id = &leader_reward.pubkey;
postgres
.send(PostgresMsg::PostgresBlock(PostgresBlock {
slot: slot as i64,
leader_id: 0, //FIX:
parent_slot: parent_slot as i64,
}))
.expect("Error sending block to postgres service");
}
for tx in transactions {
let Some(UiTransactionStatusMeta { err, status, compute_units_consumed ,.. }) = tx.meta else {
info!("tx with no meta");
continue;
};
let Some(tx) = tx.transaction.decode() else {
info!("unable to decode tx");
continue;
};
let sig = tx.get_signature().to_string();
if let Some(mut tx_status) = self.tx_sender.txs_sent.get_mut(&sig) {
//
// Metrics
//
if status.is_ok() {
if commitment_config.is_finalized() {
TXS_FINALIZED.inc();
} else {
TXS_CONFIRMED.inc();
}
if let Some(mut tx_status) = self.tx_sender.txs_sent.get_mut(&sig) {
//
// Metrics
//
if status.is_ok() {
if commitment_config.is_finalized() {
TXS_FINALIZED.inc();
} else {
TXS_CONFIRMED.inc();
}
tx_status.value_mut().status = Some(TransactionStatus {
slot,
confirmations: None,
status,
err: err.clone(),
confirmation_status: Some(comfirmation_status.clone()),
});
//
// Write to postgres
//
if let Some(postgres) = &postgres {
let cu_consumed = match compute_units_consumed {
OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64),
_ => None,
};
postgres
.send(PostgresMsg::PostgresUpdateTx(
PostgresUpdateTx {
processed_slot: slot as i64,
cu_consumed,
cu_requested: None, //TODO: cu requested
},
sig.clone(),
))
.unwrap();
}
};
// subscribers
if let Some((_sig, mut sink)) = self.signature_subscribers.remove(&sig) {
// none if transaction succeeded
sink.send(&RpcResponse {
context: RpcResponseContext {
slot,
api_version: None,
},
value: serde_json::json!({ "err": err }),
})?;
}
tx_status.value_mut().status = Some(TransactionStatus {
slot,
confirmations: None,
status,
err: err.clone(),
confirmation_status: Some(comfirmation_status.clone()),
});
//
// Write to postgres
//
if let Some(postgres) = &postgres {
let cu_consumed = match compute_units_consumed {
OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64),
_ => None,
};
postgres
.send(PostgresMsg::PostgresUpdateTx(
PostgresUpdateTx {
processed_slot: slot as i64,
cu_consumed,
cu_requested: None, //TODO: cu requested
},
sig.clone(),
))
.unwrap();
}
};
// subscribers
if let Some((_sig, mut sink)) = self.signature_subscribers.remove(&sig) {
// none if transaction succeeded
sink.send(&RpcResponse {
context: RpcResponseContext {
slot,
api_version: None,
},
value: serde_json::json!({ "err": err }),
})?;
}
}
Ok(())
}
pub fn listen(
@ -284,15 +270,44 @@ impl BlockListener {
postgres: Option<PostgresMpscSend>,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
let commitment = commitment_config.commitment;
info!("Listening to {commitment:?} blocks");
loop {
let ws_addr = &self.tx_sender.tpu_manager.ws_addr;
let pub_sub_client = PubsubClient::new(ws_addr).await?;
let err = self
.clone()
.listen_from_pubsub(&pub_sub_client, commitment_config, &postgres)
.await
.unwrap_err();
warn!("{commitment_config:?} Block Subscribe error {err}");
let (
_,
BlockInformation {
slot: latest_slot,
block_height: _,
},
) = self
.block_store
.get_latest_block_info(commitment_config)
.await;
let block_slots = self
.rpc_client
.get_blocks_with_commitment(latest_slot, None, commitment_config)
.await?;
let block_future_handlers = block_slots.into_iter().map(|slot| {
let this = self.clone();
let postgres = postgres.clone();
tokio::spawn(async move {
if let Err(err) = this
.index_slot(slot, commitment_config, postgres.clone())
.await
{
warn!(
"Error while indexing {commitment_config:?} block with slot {slot} {err}"
);
};
})
});
let _ = try_join_all(block_future_handlers).await;
}
})
}