diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index 432a3824..62c42fbe 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -8,7 +8,7 @@ use itertools::Itertools; use solana_client::nonblocking::rpc_client::RpcClient; use solana_lite_rpc_core::{ structures::{ - processed_block::{ProcessedBlock, TransactionInfo}, + produced_block::{ProducedBlock, TransactionInfo}, slot_notification::SlotNotification, }, AnyhowJoinHandle, @@ -39,7 +39,7 @@ use yellowstone_grpc_proto::prelude::{ fn process_block( block: SubscribeUpdateBlock, commitment_config: CommitmentConfig, -) -> ProcessedBlock { +) -> ProducedBlock { let txs: Vec = block .transactions .into_iter() @@ -125,54 +125,66 @@ fn process_block( .collect(), }); - let legacy_compute_budget = message.instructions().iter().find_map(|i| { - if i.program_id(message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated { - units, - additional_fee, - }) = try_from_slice_unchecked(i.data.as_slice()) + let legacy_compute_budget: Option<(u32, Option)> = + message.instructions().iter().find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) { - return Some((units, additional_fee)); + if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated { + units, + additional_fee, + }) = try_from_slice_unchecked(i.data.as_slice()) + { + if additional_fee > 0 { + return Some(( + units, + Some(((units * 1000) / additional_fee) as u64), + )); + } else { + return Some((units, None)); + } + } } - } - None - }); + None + }); - let mut cu_requested = message.instructions().iter().find_map(|i| { - if i.program_id(message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = - try_from_slice_unchecked(i.data.as_slice()) + let legacy_cu_requested = legacy_compute_budget.map(|x| x.0); + let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None); + + let cu_requested = message + .instructions() + .iter() + .find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) { - return Some(limit); + if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(limit); + } } - } - None - }); + None + }) + .or(legacy_cu_requested); - let mut prioritization_fees = message.instructions().iter().find_map(|i| { - if i.program_id(message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = - try_from_slice_unchecked(i.data.as_slice()) + let prioritization_fees = message + .instructions() + .iter() + .find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) { - return Some(price); + if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(price); + } } - } - None - }); - - if let Some((units, additional_fee)) = legacy_compute_budget { - cu_requested = Some(units); - if additional_fee > 0 { - prioritization_fees = Some(((units * 1000) / additional_fee).into()) - } - }; + None + }) + .or(legacy_prioritization_fees); Some(TransactionInfo { signature: signature.to_string(), @@ -201,7 +213,6 @@ fn process_block( } yellowstone_grpc_proto::prelude::RewardType::Voting => Some(RewardType::Voting), }, - // Warn: idk how to convert string to option u8 commission: None, }) .collect_vec() @@ -216,7 +227,7 @@ fn process_block( None }; - ProcessedBlock { + ProducedBlock { txs, block_height: block .block_height @@ -233,7 +244,7 @@ fn process_block( pub fn create_block_processing_task( grpc_addr: String, - block_sx: Sender, + block_sx: Sender, commitment_level: CommitmentLevel, ) -> AnyhowJoinHandle { let mut blocks_subs = HashMap::new(); @@ -286,12 +297,12 @@ pub fn create_block_processing_task( UpdateOneof::Ping(_) => { log::trace!("GRPC Ping"); } - k => { - bail!("Unexpected update: {k:?}"); + u => { + bail!("Unexpected update: {u:?}"); } }; } - bail!("gyser slot stream ended"); + bail!("geyser slot stream ended"); }) } @@ -316,7 +327,7 @@ pub fn create_grpc_subscription( let version = client.get_version().await?.version; if version != expected_grpc_version { log::warn!( - "Expected version {:?}, got {:?}", + "Expected grpc version {:?}, got {:?}, continue", expected_grpc_version, version ); @@ -358,7 +369,7 @@ pub fn create_grpc_subscription( } }; } - bail!("gyser slot stream ended"); + bail!("geyser slot stream ended"); }); let block_confirmed_task: AnyhowJoinHandle = create_block_processing_task( diff --git a/cluster-endpoints/src/json_rpc_leaders_getter.rs b/cluster-endpoints/src/json_rpc_leaders_getter.rs index 3f75d5f1..207311ed 100644 --- a/cluster-endpoints/src/json_rpc_leaders_getter.rs +++ b/cluster-endpoints/src/json_rpc_leaders_getter.rs @@ -8,6 +8,8 @@ use solana_lite_rpc_core::{ use std::{collections::VecDeque, sync::Arc}; use tokio::sync::RwLock; +// Stores leaders for slots from older to newer in leader schedule +// regularly removed old leaders and adds new ones pub struct JsonRpcLeaderGetter { rpc_client: Arc, leader_schedule: RwLock>, @@ -40,14 +42,9 @@ impl JsonRpcLeaderGetter { } let last_slot_needed = slot + self.leaders_to_cache_count; - let queue_end_slot = leader_queue.back().map_or(slot, |x| x.leader_slot); + let first_slot_to_fetch = leader_queue.back().map_or(slot, |x| x.leader_slot + 1); - if last_slot_needed > queue_end_slot { - let first_slot_to_fetch = if leader_queue.is_empty() { - queue_end_slot - } else { - queue_end_slot + 1 - }; + if last_slot_needed >= first_slot_to_fetch { let leaders = self .rpc_client .get_slot_leaders(first_slot_to_fetch, last_slot_needed - first_slot_to_fetch) @@ -83,17 +80,17 @@ impl LeaderFetcherInterface for JsonRpcLeaderGetter { || schedule.front().unwrap().leader_slot > from || schedule.back().unwrap().leader_slot < to { + // rebuilding the leader schedule drop(schedule); self.update_leader_schedule(from).await?; self.leader_schedule.read().await } else { schedule }; - let ls = schedule + Ok(schedule .iter() .filter(|x| x.leader_slot >= from && x.leader_slot <= to) .cloned() - .collect_vec(); - Ok(ls) + .collect_vec()) } } diff --git a/cluster-endpoints/src/rpc_polling/poll_blocks.rs b/cluster-endpoints/src/rpc_polling/poll_blocks.rs index b0ca50bb..fd680e46 100644 --- a/cluster-endpoints/src/rpc_polling/poll_blocks.rs +++ b/cluster-endpoints/src/rpc_polling/poll_blocks.rs @@ -2,7 +2,7 @@ use anyhow::Context; use solana_client::nonblocking::rpc_client::RpcClient; use solana_lite_rpc_core::{ structures::{ - processed_block::{ProcessedBlock, TransactionInfo}, + produced_block::{ProducedBlock, TransactionInfo}, slot_notification::SlotNotification, }, AnyhowJoinHandle, @@ -28,7 +28,7 @@ pub async fn process_block( rpc_client: &RpcClient, slot: Slot, commitment_config: CommitmentConfig, -) -> Option { +) -> Option { let block = rpc_client .get_block_with_config( slot, @@ -152,7 +152,7 @@ pub async fn process_block( let block_time = block.block_time.unwrap_or(0) as u64; - Some(ProcessedBlock { + Some(ProducedBlock { txs, block_height, leader_id, @@ -166,7 +166,7 @@ pub async fn process_block( pub fn poll_block( rpc_client: Arc, - block_notification_sender: Sender, + block_notification_sender: Sender, slot_notification: Receiver, ) -> Vec { let task_spawner: AnyhowJoinHandle = tokio::spawn(async move { diff --git a/core/src/stores/block_information_store.rs b/core/src/stores/block_information_store.rs index 85823c77..216901e8 100644 --- a/core/src/stores/block_information_store.rs +++ b/core/src/stores/block_information_store.rs @@ -2,12 +2,14 @@ use dashmap::DashMap; use log::info; use solana_sdk::{ - clock::MAX_RECENT_BLOCKHASHES, commitment_config::CommitmentConfig, slot_history::Slot, + clock::MAX_RECENT_BLOCKHASHES, + commitment_config::{CommitmentConfig, CommitmentLevel}, + slot_history::Slot, }; use std::sync::Arc; -use tokio::{sync::RwLock, time::Instant}; +use tokio::sync::RwLock; -use crate::structures::processed_block::ProcessedBlock; +use crate::structures::produced_block::ProducedBlock; #[derive(Clone, Debug)] pub struct BlockInformation { @@ -16,16 +18,18 @@ pub struct BlockInformation { pub last_valid_blockheight: u64, pub cleanup_slot: Slot, pub blockhash: String, + pub commitment_config: CommitmentConfig, } impl BlockInformation { - pub fn from_block(block: &ProcessedBlock) -> Self { + pub fn from_block(block: &ProducedBlock) -> Self { BlockInformation { slot: block.slot, block_height: block.block_height, last_valid_blockheight: block.block_height + MAX_RECENT_BLOCKHASHES as u64, cleanup_slot: block.block_height + 1000, blockhash: block.blockhash.clone(), + commitment_config: block.commitment_config, } } } @@ -36,7 +40,6 @@ pub struct BlockInformationStore { latest_processed_block: Arc>, latest_confirmed_block: Arc>, latest_finalized_block: Arc>, - last_add_block_metric: Arc>, } impl BlockInformationStore { @@ -53,7 +56,6 @@ impl BlockInformationStore { latest_confirmed_block: Arc::new(RwLock::new(latest_finalized_block.clone())), latest_finalized_block: Arc::new(RwLock::new(latest_finalized_block)), blocks, - last_add_block_metric: Arc::new(RwLock::new(Instant::now())), } } @@ -103,31 +105,40 @@ impl BlockInformationStore { .clone() } - pub async fn add_block( - &self, - block_info: BlockInformation, - commitment_config: CommitmentConfig, - ) { - // create context for add block metric - { - let mut last_add_block_metric = self.last_add_block_metric.write().await; - *last_add_block_metric = Instant::now(); - } - + pub async fn add_block(&self, block_info: BlockInformation) -> bool { // save slot copy to avoid borrow issues let slot = block_info.slot; - - // Write to block store first in order to prevent - // any race condition i.e prevent some one to - // ask the map what it doesn't have rn - self.blocks - .insert(block_info.blockhash.clone(), block_info.clone()); + let commitment_config = block_info.commitment_config; + // check if the block has already been added with higher commitment level + match self.blocks.get_mut(&block_info.blockhash) { + Some(mut prev_block_info) => { + let should_update = match prev_block_info.commitment_config.commitment { + CommitmentLevel::Finalized => false, // should never update blocks of finalized commitment + CommitmentLevel::Confirmed => { + commitment_config == CommitmentConfig::finalized() + } // should only updated confirmed with finalized block + _ => { + commitment_config == CommitmentConfig::confirmed() + || commitment_config == CommitmentConfig::finalized() + } + }; + if !should_update { + return false; + } + *prev_block_info = block_info.clone(); + } + None => { + self.blocks + .insert(block_info.blockhash.clone(), block_info.clone()); + } + } // update latest block let latest_block = self.get_latest_block_arc(commitment_config); if slot > latest_block.read().await.slot { *latest_block.write().await = block_info; } + true } pub async fn clean(&self) { diff --git a/core/src/stores/subscription_store.rs b/core/src/stores/subscription_store.rs index 9e3b0b14..ae683d6c 100644 --- a/core/src/stores/subscription_store.rs +++ b/core/src/stores/subscription_store.rs @@ -1,4 +1,4 @@ -use crate::{structures::processed_block::TransactionInfo, types::SubscptionHanderSink}; +use crate::{structures::produced_block::TransactionInfo, types::SubscptionHanderSink}; use dashmap::DashMap; use solana_sdk::{ commitment_config::{CommitmentConfig, CommitmentLevel}, diff --git a/core/src/structures/identity_stakes.rs b/core/src/structures/identity_stakes.rs index a38ed300..fcb11569 100644 --- a/core/src/structures/identity_stakes.rs +++ b/core/src/structures/identity_stakes.rs @@ -64,7 +64,7 @@ impl IdentityStakes { }; log::info!( - "Idenity stakes {}, {}, {}, {}", + "Identity stakes {}, {}, {}, {}", identity_stakes.total_stakes, identity_stakes.min_stakes, identity_stakes.max_stakes, diff --git a/core/src/structures/mod.rs b/core/src/structures/mod.rs index 2e50b636..8d642be1 100644 --- a/core/src/structures/mod.rs +++ b/core/src/structures/mod.rs @@ -3,7 +3,7 @@ pub mod identity_stakes; pub mod leader_data; pub mod notifications; -pub mod processed_block; +pub mod produced_block; pub mod proxy_request_format; pub mod rotating_queue; pub mod slot_notification; diff --git a/core/src/structures/processed_block.rs b/core/src/structures/produced_block.rs similarity index 95% rename from core/src/structures/processed_block.rs rename to core/src/structures/produced_block.rs index 9d5bce51..cd80bf45 100644 --- a/core/src/structures/processed_block.rs +++ b/core/src/structures/produced_block.rs @@ -12,7 +12,7 @@ pub struct TransactionInfo { } #[derive(Default, Debug, Clone)] -pub struct ProcessedBlock { +pub struct ProducedBlock { pub txs: Vec, pub leader_id: Option, pub blockhash: String, diff --git a/core/src/traits/leaders_fetcher_interface.rs b/core/src/traits/leaders_fetcher_interface.rs index 192737b3..ae8e257b 100644 --- a/core/src/traits/leaders_fetcher_interface.rs +++ b/core/src/traits/leaders_fetcher_interface.rs @@ -4,5 +4,6 @@ use solana_sdk::slot_history::Slot; #[async_trait] pub trait LeaderFetcherInterface: Send + Sync { + // should return leader schedule for all the slots >= from and <= to async fn get_slot_leaders(&self, from: Slot, to: Slot) -> anyhow::Result>; } diff --git a/core/src/types.rs b/core/src/types.rs index 54ad7020..96ba3cee 100644 --- a/core/src/types.rs +++ b/core/src/types.rs @@ -4,11 +4,11 @@ use solana_rpc_client_api::response::{RpcContactInfo, RpcVoteAccountStatus}; use tokio::sync::broadcast::Receiver; use crate::{ - structures::{processed_block::ProcessedBlock, slot_notification::SlotNotification}, + structures::{produced_block::ProducedBlock, slot_notification::SlotNotification}, traits::subscription_sink::SubscriptionSink, }; -pub type BlockStream = Receiver; +pub type BlockStream = Receiver; pub type SlotStream = Receiver; pub type VoteAccountStream = Receiver; pub type ClusterInfoStream = Receiver>; diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index e7b13670..77113717 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -25,7 +25,7 @@ use solana_lite_rpc_core::stores::{ }; use solana_lite_rpc_core::structures::{ identity_stakes::IdentityStakes, notifications::NotificationSender, - processed_block::ProcessedBlock, + produced_block::ProducedBlock, }; use solana_lite_rpc_core::types::BlockStream; use solana_lite_rpc_core::AnyhowJoinHandle; @@ -48,7 +48,7 @@ use crate::rpc_tester::RpcTester; async fn get_latest_block( mut block_stream: BlockStream, commitment_config: CommitmentConfig, -) -> ProcessedBlock { +) -> ProducedBlock { while let Ok(block) = block_stream.recv().await { if block.commitment_config == commitment_config { return block; diff --git a/services/src/data_caching_service.rs b/services/src/data_caching_service.rs index dbc4c9ff..659d9812 100644 --- a/services/src/data_caching_service.rs +++ b/services/src/data_caching_service.rs @@ -51,12 +51,10 @@ impl DataCachingService { let mut block_notifier = block_notifier; loop { let block = block_notifier.recv().await.expect("Should recv blocks"); + data_cache .block_store - .add_block( - BlockInformation::from_block(&block), - block.commitment_config, - ) + .add_block(BlockInformation::from_block(&block)) .await; let confirmation_status = match block.commitment_config.commitment {