diff --git a/src/block_store.rs b/src/block_store.rs new file mode 100644 index 00000000..65246217 --- /dev/null +++ b/src/block_store.rs @@ -0,0 +1,37 @@ +use std::sync::{Arc, RwLock}; + +use dashmap::DashMap; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::commitment_config::CommitmentConfig; + +use crate::workers::BlockInformation; + +pub struct BlockStore { + blocks: Arc>, + latest_block_hash: Arc>, +} + +impl BlockStore { + pub async fn new( + rpc_client: &RpcClient, + commitment_config: CommitmentConfig, + ) -> anyhow::Result { + let (latest_block_hash, block_height) = rpc_client + .get_latest_blockhash_with_commitment(commitment_config) + .await?; + + let latest_block_hash = latest_block_hash.to_string(); + let slot = rpc_client + .get_slot_with_commitment(commitment_config) + .await?; + + Ok(Self { + latest_block_hash: Arc::new(RwLock::new(latest_block_hash.clone())), + blocks: Arc::new({ + let map = DashMap::new(); + map.insert(latest_block_hash, BlockInformation { slot, block_height }); + map + }), + }) + } +} diff --git a/src/lib.rs b/src/lib.rs index 4bb5e821..041530c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ pub mod errors; pub mod rpc; pub mod tpu_manager; pub mod workers; +pub mod block_store; #[from_env] pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899"; diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index 1cefd389..77620925 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -15,8 +15,8 @@ use solana_rpc_client_api::{ use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; use solana_transaction_status::{ - option_serializer::OptionSerializer, TransactionConfirmationStatus, TransactionStatus, - UiConfirmedBlock, UiTransactionStatusMeta, + option_serializer::OptionSerializer, RewardType, TransactionConfirmationStatus, + TransactionStatus, UiConfirmedBlock, UiTransactionStatusMeta, }; use tokio::{ sync::{mpsc::Sender, RwLock}, @@ -57,14 +57,6 @@ impl BlockListener { tx_sender: TxSender, commitment_config: CommitmentConfig, ) -> anyhow::Result { - let (latest_block_hash, block_height) = rpc_client - .get_latest_blockhash_with_commitment(commitment_config) - .await?; - - let latest_block_hash = latest_block_hash.to_string(); - let slot = rpc_client - .get_slot_with_commitment(commitment_config) - .await?; Ok(Self { pub_sub_client, @@ -97,7 +89,7 @@ impl BlockListener { blockhash.to_owned(), self.block_store .get(blockhash) - .expect("Latest Block Not in Map") + .expect("Race Condition: Latest block not in block store") .value() .to_owned(), ) @@ -172,15 +164,27 @@ impl BlockListener { let parent_slot = block.parent_slot; - *self.latest_block_hash.write().await = blockhash.clone(); self.block_store .insert(blockhash, BlockInformation { slot, block_height }); + *self.latest_block_hash.write().await = blockhash.clone(); if let Some(postgres) = &postgres { + let Some(rewards) = block.rewards else { + continue; + }; + + let Some(leader_reward) = rewards + .iter() + .find(|reward| Some(RewardType::Fee) == reward.reward_type) else { + continue; + }; + + let leader_id = &leader_reward.pubkey; + postgres .send(PostgresMsg::PostgresBlock(PostgresBlock { slot: slot as i64, - leader_id: 0, //FIX: + leader_id, //FIX: parent_slot: parent_slot as i64, })) .expect("Error sending block to postgres service");