This commit is contained in:
aniketfuryrocks 2023-02-02 18:00:09 +05:30
parent 188ed25e51
commit 566b4ee7ec
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
3 changed files with 55 additions and 13 deletions

37
src/block_store.rs Normal file
View File

@ -0,0 +1,37 @@
use std::sync::{Arc, RwLock};
use dashmap::DashMap;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use crate::workers::BlockInformation;
pub struct BlockStore {
blocks: Arc<DashMap<String, BlockInformation>>,
latest_block_hash: Arc<RwLock<String>>,
}
impl BlockStore {
pub async fn new(
rpc_client: &RpcClient,
commitment_config: CommitmentConfig,
) -> anyhow::Result<Self> {
let (latest_block_hash, block_height) = rpc_client
.get_latest_blockhash_with_commitment(commitment_config)
.await?;
let latest_block_hash = latest_block_hash.to_string();
let slot = rpc_client
.get_slot_with_commitment(commitment_config)
.await?;
Ok(Self {
latest_block_hash: Arc::new(RwLock::new(latest_block_hash.clone())),
blocks: Arc::new({
let map = DashMap::new();
map.insert(latest_block_hash, BlockInformation { slot, block_height });
map
}),
})
}
}

View File

@ -9,6 +9,7 @@ pub mod errors;
pub mod rpc;
pub mod tpu_manager;
pub mod workers;
pub mod block_store;
#[from_env]
pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899";

View File

@ -15,8 +15,8 @@ use solana_rpc_client_api::{
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_transaction_status::{
option_serializer::OptionSerializer, TransactionConfirmationStatus, TransactionStatus,
UiConfirmedBlock, UiTransactionStatusMeta,
option_serializer::OptionSerializer, RewardType, TransactionConfirmationStatus,
TransactionStatus, UiConfirmedBlock, UiTransactionStatusMeta,
};
use tokio::{
sync::{mpsc::Sender, RwLock},
@ -57,14 +57,6 @@ impl BlockListener {
tx_sender: TxSender,
commitment_config: CommitmentConfig,
) -> anyhow::Result<Self> {
let (latest_block_hash, block_height) = rpc_client
.get_latest_blockhash_with_commitment(commitment_config)
.await?;
let latest_block_hash = latest_block_hash.to_string();
let slot = rpc_client
.get_slot_with_commitment(commitment_config)
.await?;
Ok(Self {
pub_sub_client,
@ -97,7 +89,7 @@ impl BlockListener {
blockhash.to_owned(),
self.block_store
.get(blockhash)
.expect("Latest Block Not in Map")
.expect("Race Condition: Latest block not in block store")
.value()
.to_owned(),
)
@ -172,15 +164,27 @@ impl BlockListener {
let parent_slot = block.parent_slot;
*self.latest_block_hash.write().await = blockhash.clone();
self.block_store
.insert(blockhash, BlockInformation { slot, block_height });
*self.latest_block_hash.write().await = blockhash.clone();
if let Some(postgres) = &postgres {
let Some(rewards) = block.rewards else {
continue;
};
let Some(leader_reward) = rewards
.iter()
.find(|reward| Some(RewardType::Fee) == reward.reward_type) else {
continue;
};
let leader_id = &leader_reward.pubkey;
postgres
.send(PostgresMsg::PostgresBlock(PostgresBlock {
slot: slot as i64,
leader_id: 0, //FIX:
leader_id, //FIX:
parent_slot: parent_slot as i64,
}))
.expect("Error sending block to postgres service");