changes after groovies review

This commit is contained in:
Godmode Galactus 2023-09-05 10:28:37 +02:00
parent 5b7dd5de6b
commit 7ce00a2642
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
12 changed files with 116 additions and 98 deletions

View File

@ -8,7 +8,7 @@ use itertools::Itertools;
use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::{ use solana_lite_rpc_core::{
structures::{ structures::{
processed_block::{ProcessedBlock, TransactionInfo}, produced_block::{ProducedBlock, TransactionInfo},
slot_notification::SlotNotification, slot_notification::SlotNotification,
}, },
AnyhowJoinHandle, AnyhowJoinHandle,
@ -39,7 +39,7 @@ use yellowstone_grpc_proto::prelude::{
fn process_block( fn process_block(
block: SubscribeUpdateBlock, block: SubscribeUpdateBlock,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> ProcessedBlock { ) -> ProducedBlock {
let txs: Vec<TransactionInfo> = block let txs: Vec<TransactionInfo> = block
.transactions .transactions
.into_iter() .into_iter()
@ -125,54 +125,66 @@ fn process_block(
.collect(), .collect(),
}); });
let legacy_compute_budget = message.instructions().iter().find_map(|i| { let legacy_compute_budget: Option<(u32, Option<u64>)> =
if i.program_id(message.static_account_keys()) message.instructions().iter().find_map(|i| {
.eq(&compute_budget::id()) if i.program_id(message.static_account_keys())
{ .eq(&compute_budget::id())
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{ {
return Some((units, additional_fee)); if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
if additional_fee > 0 {
return Some((
units,
Some(((units * 1000) / additional_fee) as u64),
));
} else {
return Some((units, None));
}
}
} }
} None
None });
});
let mut cu_requested = message.instructions().iter().find_map(|i| { let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
if i.program_id(message.static_account_keys()) let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None);
.eq(&compute_budget::id())
{ let cu_requested = message
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = .instructions()
try_from_slice_unchecked(i.data.as_slice()) .iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{ {
return Some(limit); if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
} }
} None
None })
}); .or(legacy_cu_requested);
let mut prioritization_fees = message.instructions().iter().find_map(|i| { let prioritization_fees = message
if i.program_id(message.static_account_keys()) .instructions()
.eq(&compute_budget::id()) .iter()
{ .find_map(|i| {
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = if i.program_id(message.static_account_keys())
try_from_slice_unchecked(i.data.as_slice()) .eq(&compute_budget::id())
{ {
return Some(price); if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
} }
}
None None
}); })
.or(legacy_prioritization_fees);
if let Some((units, additional_fee)) = legacy_compute_budget {
cu_requested = Some(units);
if additional_fee > 0 {
prioritization_fees = Some(((units * 1000) / additional_fee).into())
}
};
Some(TransactionInfo { Some(TransactionInfo {
signature: signature.to_string(), signature: signature.to_string(),
@ -201,7 +213,6 @@ fn process_block(
} }
yellowstone_grpc_proto::prelude::RewardType::Voting => Some(RewardType::Voting), yellowstone_grpc_proto::prelude::RewardType::Voting => Some(RewardType::Voting),
}, },
// Warn: idk how to convert string to option u8
commission: None, commission: None,
}) })
.collect_vec() .collect_vec()
@ -216,7 +227,7 @@ fn process_block(
None None
}; };
ProcessedBlock { ProducedBlock {
txs, txs,
block_height: block block_height: block
.block_height .block_height
@ -233,7 +244,7 @@ fn process_block(
pub fn create_block_processing_task( pub fn create_block_processing_task(
grpc_addr: String, grpc_addr: String,
block_sx: Sender<ProcessedBlock>, block_sx: Sender<ProducedBlock>,
commitment_level: CommitmentLevel, commitment_level: CommitmentLevel,
) -> AnyhowJoinHandle { ) -> AnyhowJoinHandle {
let mut blocks_subs = HashMap::new(); let mut blocks_subs = HashMap::new();
@ -286,12 +297,12 @@ pub fn create_block_processing_task(
UpdateOneof::Ping(_) => { UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping"); log::trace!("GRPC Ping");
} }
k => { u => {
bail!("Unexpected update: {k:?}"); bail!("Unexpected update: {u:?}");
} }
}; };
} }
bail!("gyser slot stream ended"); bail!("geyser slot stream ended");
}) })
} }
@ -316,7 +327,7 @@ pub fn create_grpc_subscription(
let version = client.get_version().await?.version; let version = client.get_version().await?.version;
if version != expected_grpc_version { if version != expected_grpc_version {
log::warn!( log::warn!(
"Expected version {:?}, got {:?}", "Expected grpc version {:?}, got {:?}, continue",
expected_grpc_version, expected_grpc_version,
version version
); );
@ -358,7 +369,7 @@ pub fn create_grpc_subscription(
} }
}; };
} }
bail!("gyser slot stream ended"); bail!("geyser slot stream ended");
}); });
let block_confirmed_task: AnyhowJoinHandle = create_block_processing_task( let block_confirmed_task: AnyhowJoinHandle = create_block_processing_task(

View File

@ -8,6 +8,8 @@ use solana_lite_rpc_core::{
use std::{collections::VecDeque, sync::Arc}; use std::{collections::VecDeque, sync::Arc};
use tokio::sync::RwLock; use tokio::sync::RwLock;
// Stores leaders for slots from older to newer in leader schedule
// regularly removed old leaders and adds new ones
pub struct JsonRpcLeaderGetter { pub struct JsonRpcLeaderGetter {
rpc_client: Arc<RpcClient>, rpc_client: Arc<RpcClient>,
leader_schedule: RwLock<VecDeque<LeaderData>>, leader_schedule: RwLock<VecDeque<LeaderData>>,
@ -40,14 +42,9 @@ impl JsonRpcLeaderGetter {
} }
let last_slot_needed = slot + self.leaders_to_cache_count; let last_slot_needed = slot + self.leaders_to_cache_count;
let queue_end_slot = leader_queue.back().map_or(slot, |x| x.leader_slot); let first_slot_to_fetch = leader_queue.back().map_or(slot, |x| x.leader_slot + 1);
if last_slot_needed > queue_end_slot { if last_slot_needed >= first_slot_to_fetch {
let first_slot_to_fetch = if leader_queue.is_empty() {
queue_end_slot
} else {
queue_end_slot + 1
};
let leaders = self let leaders = self
.rpc_client .rpc_client
.get_slot_leaders(first_slot_to_fetch, last_slot_needed - first_slot_to_fetch) .get_slot_leaders(first_slot_to_fetch, last_slot_needed - first_slot_to_fetch)
@ -83,17 +80,17 @@ impl LeaderFetcherInterface for JsonRpcLeaderGetter {
|| schedule.front().unwrap().leader_slot > from || schedule.front().unwrap().leader_slot > from
|| schedule.back().unwrap().leader_slot < to || schedule.back().unwrap().leader_slot < to
{ {
// rebuilding the leader schedule
drop(schedule); drop(schedule);
self.update_leader_schedule(from).await?; self.update_leader_schedule(from).await?;
self.leader_schedule.read().await self.leader_schedule.read().await
} else { } else {
schedule schedule
}; };
let ls = schedule Ok(schedule
.iter() .iter()
.filter(|x| x.leader_slot >= from && x.leader_slot <= to) .filter(|x| x.leader_slot >= from && x.leader_slot <= to)
.cloned() .cloned()
.collect_vec(); .collect_vec())
Ok(ls)
} }
} }

View File

@ -2,7 +2,7 @@ use anyhow::Context;
use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::{ use solana_lite_rpc_core::{
structures::{ structures::{
processed_block::{ProcessedBlock, TransactionInfo}, produced_block::{ProducedBlock, TransactionInfo},
slot_notification::SlotNotification, slot_notification::SlotNotification,
}, },
AnyhowJoinHandle, AnyhowJoinHandle,
@ -28,7 +28,7 @@ pub async fn process_block(
rpc_client: &RpcClient, rpc_client: &RpcClient,
slot: Slot, slot: Slot,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> Option<ProcessedBlock> { ) -> Option<ProducedBlock> {
let block = rpc_client let block = rpc_client
.get_block_with_config( .get_block_with_config(
slot, slot,
@ -152,7 +152,7 @@ pub async fn process_block(
let block_time = block.block_time.unwrap_or(0) as u64; let block_time = block.block_time.unwrap_or(0) as u64;
Some(ProcessedBlock { Some(ProducedBlock {
txs, txs,
block_height, block_height,
leader_id, leader_id,
@ -166,7 +166,7 @@ pub async fn process_block(
pub fn poll_block( pub fn poll_block(
rpc_client: Arc<RpcClient>, rpc_client: Arc<RpcClient>,
block_notification_sender: Sender<ProcessedBlock>, block_notification_sender: Sender<ProducedBlock>,
slot_notification: Receiver<SlotNotification>, slot_notification: Receiver<SlotNotification>,
) -> Vec<AnyhowJoinHandle> { ) -> Vec<AnyhowJoinHandle> {
let task_spawner: AnyhowJoinHandle = tokio::spawn(async move { let task_spawner: AnyhowJoinHandle = tokio::spawn(async move {

View File

@ -2,12 +2,14 @@ use dashmap::DashMap;
use log::info; use log::info;
use solana_sdk::{ use solana_sdk::{
clock::MAX_RECENT_BLOCKHASHES, commitment_config::CommitmentConfig, slot_history::Slot, clock::MAX_RECENT_BLOCKHASHES,
commitment_config::{CommitmentConfig, CommitmentLevel},
slot_history::Slot,
}; };
use std::sync::Arc; use std::sync::Arc;
use tokio::{sync::RwLock, time::Instant}; use tokio::sync::RwLock;
use crate::structures::processed_block::ProcessedBlock; use crate::structures::produced_block::ProducedBlock;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct BlockInformation { pub struct BlockInformation {
@ -16,16 +18,18 @@ pub struct BlockInformation {
pub last_valid_blockheight: u64, pub last_valid_blockheight: u64,
pub cleanup_slot: Slot, pub cleanup_slot: Slot,
pub blockhash: String, pub blockhash: String,
pub commitment_config: CommitmentConfig,
} }
impl BlockInformation { impl BlockInformation {
pub fn from_block(block: &ProcessedBlock) -> Self { pub fn from_block(block: &ProducedBlock) -> Self {
BlockInformation { BlockInformation {
slot: block.slot, slot: block.slot,
block_height: block.block_height, block_height: block.block_height,
last_valid_blockheight: block.block_height + MAX_RECENT_BLOCKHASHES as u64, last_valid_blockheight: block.block_height + MAX_RECENT_BLOCKHASHES as u64,
cleanup_slot: block.block_height + 1000, cleanup_slot: block.block_height + 1000,
blockhash: block.blockhash.clone(), blockhash: block.blockhash.clone(),
commitment_config: block.commitment_config,
} }
} }
} }
@ -36,7 +40,6 @@ pub struct BlockInformationStore {
latest_processed_block: Arc<RwLock<BlockInformation>>, latest_processed_block: Arc<RwLock<BlockInformation>>,
latest_confirmed_block: Arc<RwLock<BlockInformation>>, latest_confirmed_block: Arc<RwLock<BlockInformation>>,
latest_finalized_block: Arc<RwLock<BlockInformation>>, latest_finalized_block: Arc<RwLock<BlockInformation>>,
last_add_block_metric: Arc<RwLock<Instant>>,
} }
impl BlockInformationStore { impl BlockInformationStore {
@ -53,7 +56,6 @@ impl BlockInformationStore {
latest_confirmed_block: Arc::new(RwLock::new(latest_finalized_block.clone())), latest_confirmed_block: Arc::new(RwLock::new(latest_finalized_block.clone())),
latest_finalized_block: Arc::new(RwLock::new(latest_finalized_block)), latest_finalized_block: Arc::new(RwLock::new(latest_finalized_block)),
blocks, blocks,
last_add_block_metric: Arc::new(RwLock::new(Instant::now())),
} }
} }
@ -103,31 +105,40 @@ impl BlockInformationStore {
.clone() .clone()
} }
pub async fn add_block( pub async fn add_block(&self, block_info: BlockInformation) -> bool {
&self,
block_info: BlockInformation,
commitment_config: CommitmentConfig,
) {
// create context for add block metric
{
let mut last_add_block_metric = self.last_add_block_metric.write().await;
*last_add_block_metric = Instant::now();
}
// save slot copy to avoid borrow issues // save slot copy to avoid borrow issues
let slot = block_info.slot; let slot = block_info.slot;
let commitment_config = block_info.commitment_config;
// Write to block store first in order to prevent // check if the block has already been added with higher commitment level
// any race condition i.e prevent some one to match self.blocks.get_mut(&block_info.blockhash) {
// ask the map what it doesn't have rn Some(mut prev_block_info) => {
self.blocks let should_update = match prev_block_info.commitment_config.commitment {
.insert(block_info.blockhash.clone(), block_info.clone()); CommitmentLevel::Finalized => false, // should never update blocks of finalized commitment
CommitmentLevel::Confirmed => {
commitment_config == CommitmentConfig::finalized()
} // should only updated confirmed with finalized block
_ => {
commitment_config == CommitmentConfig::confirmed()
|| commitment_config == CommitmentConfig::finalized()
}
};
if !should_update {
return false;
}
*prev_block_info = block_info.clone();
}
None => {
self.blocks
.insert(block_info.blockhash.clone(), block_info.clone());
}
}
// update latest block // update latest block
let latest_block = self.get_latest_block_arc(commitment_config); let latest_block = self.get_latest_block_arc(commitment_config);
if slot > latest_block.read().await.slot { if slot > latest_block.read().await.slot {
*latest_block.write().await = block_info; *latest_block.write().await = block_info;
} }
true
} }
pub async fn clean(&self) { pub async fn clean(&self) {

View File

@ -1,4 +1,4 @@
use crate::{structures::processed_block::TransactionInfo, types::SubscptionHanderSink}; use crate::{structures::produced_block::TransactionInfo, types::SubscptionHanderSink};
use dashmap::DashMap; use dashmap::DashMap;
use solana_sdk::{ use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel}, commitment_config::{CommitmentConfig, CommitmentLevel},

View File

@ -64,7 +64,7 @@ impl IdentityStakes {
}; };
log::info!( log::info!(
"Idenity stakes {}, {}, {}, {}", "Identity stakes {}, {}, {}, {}",
identity_stakes.total_stakes, identity_stakes.total_stakes,
identity_stakes.min_stakes, identity_stakes.min_stakes,
identity_stakes.max_stakes, identity_stakes.max_stakes,

View File

@ -3,7 +3,7 @@
pub mod identity_stakes; pub mod identity_stakes;
pub mod leader_data; pub mod leader_data;
pub mod notifications; pub mod notifications;
pub mod processed_block; pub mod produced_block;
pub mod proxy_request_format; pub mod proxy_request_format;
pub mod rotating_queue; pub mod rotating_queue;
pub mod slot_notification; pub mod slot_notification;

View File

@ -12,7 +12,7 @@ pub struct TransactionInfo {
} }
#[derive(Default, Debug, Clone)] #[derive(Default, Debug, Clone)]
pub struct ProcessedBlock { pub struct ProducedBlock {
pub txs: Vec<TransactionInfo>, pub txs: Vec<TransactionInfo>,
pub leader_id: Option<String>, pub leader_id: Option<String>,
pub blockhash: String, pub blockhash: String,

View File

@ -4,5 +4,6 @@ use solana_sdk::slot_history::Slot;
#[async_trait] #[async_trait]
pub trait LeaderFetcherInterface: Send + Sync { pub trait LeaderFetcherInterface: Send + Sync {
// should return leader schedule for all the slots >= from and <= to
async fn get_slot_leaders(&self, from: Slot, to: Slot) -> anyhow::Result<Vec<LeaderData>>; async fn get_slot_leaders(&self, from: Slot, to: Slot) -> anyhow::Result<Vec<LeaderData>>;
} }

View File

@ -4,11 +4,11 @@ use solana_rpc_client_api::response::{RpcContactInfo, RpcVoteAccountStatus};
use tokio::sync::broadcast::Receiver; use tokio::sync::broadcast::Receiver;
use crate::{ use crate::{
structures::{processed_block::ProcessedBlock, slot_notification::SlotNotification}, structures::{produced_block::ProducedBlock, slot_notification::SlotNotification},
traits::subscription_sink::SubscriptionSink, traits::subscription_sink::SubscriptionSink,
}; };
pub type BlockStream = Receiver<ProcessedBlock>; pub type BlockStream = Receiver<ProducedBlock>;
pub type SlotStream = Receiver<SlotNotification>; pub type SlotStream = Receiver<SlotNotification>;
pub type VoteAccountStream = Receiver<RpcVoteAccountStatus>; pub type VoteAccountStream = Receiver<RpcVoteAccountStatus>;
pub type ClusterInfoStream = Receiver<Vec<RpcContactInfo>>; pub type ClusterInfoStream = Receiver<Vec<RpcContactInfo>>;

View File

@ -25,7 +25,7 @@ use solana_lite_rpc_core::stores::{
}; };
use solana_lite_rpc_core::structures::{ use solana_lite_rpc_core::structures::{
identity_stakes::IdentityStakes, notifications::NotificationSender, identity_stakes::IdentityStakes, notifications::NotificationSender,
processed_block::ProcessedBlock, produced_block::ProducedBlock,
}; };
use solana_lite_rpc_core::types::BlockStream; use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::AnyhowJoinHandle; use solana_lite_rpc_core::AnyhowJoinHandle;
@ -48,7 +48,7 @@ use crate::rpc_tester::RpcTester;
async fn get_latest_block( async fn get_latest_block(
mut block_stream: BlockStream, mut block_stream: BlockStream,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> ProcessedBlock { ) -> ProducedBlock {
while let Ok(block) = block_stream.recv().await { while let Ok(block) = block_stream.recv().await {
if block.commitment_config == commitment_config { if block.commitment_config == commitment_config {
return block; return block;

View File

@ -51,12 +51,10 @@ impl DataCachingService {
let mut block_notifier = block_notifier; let mut block_notifier = block_notifier;
loop { loop {
let block = block_notifier.recv().await.expect("Should recv blocks"); let block = block_notifier.recv().await.expect("Should recv blocks");
data_cache data_cache
.block_store .block_store
.add_block( .add_block(BlockInformation::from_block(&block))
BlockInformation::from_block(&block),
block.commitment_config,
)
.await; .await;
let confirmation_status = match block.commitment_config.commitment { let confirmation_status = match block.commitment_config.commitment {