use geyser blockmeta to progress finalizedslot (#367)

introduce BlockInfo on a new BlockInfoStream; maps from yellowstone BlockMeta
fix: last_finalized_slot must progress unconditionally
This commit is contained in:
Groovie | Mango 2024-03-22 19:22:38 +01:00 committed by GitHub
parent defdc20dd5
commit cd9df11501
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 219 additions and 93 deletions

View File

@ -4,13 +4,13 @@ use anyhow::bail;
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_account_decoder::{UiAccount, UiDataSliceConfig};
use solana_lite_rpc_core::types::BlockInfoStream;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
structures::{
account_data::{AccountData, AccountNotificationMessage, AccountStream},
account_filter::AccountFilters,
},
types::BlockStream,
AnyhowJoinHandle,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
@ -151,7 +151,7 @@ impl AccountService {
pub fn process_account_stream(
&self,
mut account_stream: AccountStream,
mut block_stream: BlockStream,
mut blockinfo_stream: BlockInfoStream,
) -> Vec<AnyhowJoinHandle> {
let this = self.clone();
let processed_task = tokio::spawn(async move {
@ -187,19 +187,19 @@ impl AccountService {
let this = self.clone();
let block_processing_task = tokio::spawn(async move {
loop {
match block_stream.recv().await {
Ok(block_notification) => {
if block_notification.commitment_config.is_processed() {
match blockinfo_stream.recv().await {
Ok(block_info) => {
if block_info.commitment_config.is_processed() {
// processed commitment is not processed in this loop
continue;
}
let commitment = Commitment::from(block_notification.commitment_config);
let commitment = Commitment::from(block_info.commitment_config);
let updated_accounts = this
.account_store
.process_slot_data(block_notification.slot, commitment)
.process_slot_data(block_info.slot, commitment)
.await;
if block_notification.commitment_config.is_finalized() {
if block_info.commitment_config.is_finalized() {
ACCOUNT_UPDATES_FINALIZED.add(updated_accounts.len() as i64)
} else {
ACCOUNT_UPDATES_CONFIRMED.add(updated_accounts.len() as i64);

View File

@ -70,7 +70,7 @@ async fn storage_test() {
let (slot_notifier, _jh_multiplex_slotstream) =
create_grpc_multiplex_processed_slots_subscription(grpc_sources.clone());
let (blocks_notifier, _jh_multiplex_blockstream) =
let (blocks_notifier, _blockmeta_output_stream, _jh_multiplex_blockstream) =
create_grpc_multiplex_blocks_subscription(grpc_sources);
let (epoch_cache, _) = EpochCache::bootstrap_epoch(&rpc_client).await.unwrap();

View File

@ -1,11 +1,12 @@
use solana_lite_rpc_core::{
structures::account_data::AccountStream,
types::{BlockStream, ClusterInfoStream, SlotStream, VoteAccountStream},
types::{BlockInfoStream, BlockStream, ClusterInfoStream, SlotStream, VoteAccountStream},
};
/// subscribers to broadcast channels should assume that channels are not getting closed unless the system is shutting down
pub struct EndpointStreaming {
pub blocks_notifier: BlockStream,
pub blockinfo_notifier: BlockInfoStream,
pub slot_notifier: SlotStream,
pub vote_account_notifier: VoteAccountStream,
pub cluster_info_notifier: ClusterInfoStream,

View File

@ -1,6 +1,5 @@
use anyhow::{bail, Context};
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{GeyserFilter, GrpcSourceConfig, Message};
use log::{debug, info, trace, warn};
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
@ -10,6 +9,7 @@ use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_lite_rpc_core::solana_utils::hash_from_str;
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
@ -109,9 +109,9 @@ fn create_grpc_multiplex_processed_block_task(
}
// backpressure: the mpsc sender will block grpc stream until capacity is available
fn create_grpc_multiplex_block_meta_task(
fn create_grpc_multiplex_block_info_task(
grpc_sources: &Vec<GrpcSourceConfig>,
block_meta_sender: tokio::sync::mpsc::Sender<BlockMeta>,
block_info_sender: tokio::sync::mpsc::Sender<BlockInfo>,
commitment_config: CommitmentConfig,
) -> Vec<AbortHandle> {
let (autoconnect_tx, mut blocks_rx) = tokio::sync::mpsc::channel(10);
@ -134,14 +134,24 @@ fn create_grpc_multiplex_block_meta_task(
let proposed_slot = block_meta.slot;
if proposed_slot > tip {
tip = proposed_slot;
let block_meta = BlockMeta {
let block_meta = BlockInfo {
slot: proposed_slot,
block_height: block_meta
.block_height
.expect("block_height from geyser block meta")
.block_height,
blockhash: hash_from_str(&block_meta.blockhash)
.expect("valid blockhash"),
commitment_config,
block_time: block_meta
.block_time
.expect("block_time from geyser block meta")
.timestamp
as u64,
};
let send_started_at = Instant::now();
let send_result = block_meta_sender
let send_result = block_info_sender
.send(block_meta)
.await
.context("Send block to channel");
@ -188,7 +198,11 @@ fn create_grpc_multiplex_block_meta_task(
/// the channel must never be closed
pub fn create_grpc_multiplex_blocks_subscription(
grpc_sources: Vec<GrpcSourceConfig>,
) -> (Receiver<ProducedBlock>, AnyhowJoinHandle) {
) -> (
Receiver<ProducedBlock>,
Receiver<BlockInfo>,
AnyhowJoinHandle,
) {
info!("Setup grpc multiplexed blocks connection...");
if grpc_sources.is_empty() {
info!("- no grpc connection configured");
@ -198,9 +212,13 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
// return value is the broadcast receiver
// must NEVER be closed form inside this method
// must NEVER be closed from inside this method
let (producedblock_sender, blocks_output_stream) =
tokio::sync::broadcast::channel::<ProducedBlock>(32);
// provide information about finalized blocks as quickly as possible
// note that produced block stream might most probably lag behind
let (blockinfo_sender, blockinfo_output_stream) =
tokio::sync::broadcast::channel::<BlockInfo>(32);
let mut reconnect_attempts = 0;
@ -210,10 +228,12 @@ pub fn create_grpc_multiplex_blocks_subscription(
// channels must NEVER GET CLOSED (unless full restart of multiplexer)
let (processed_block_sender, mut processed_block_reciever) =
tokio::sync::mpsc::channel::<ProducedBlock>(10); // experiemental
let (block_meta_sender_confirmed, mut block_meta_reciever_confirmed) =
tokio::sync::mpsc::channel::<BlockMeta>(500);
let (block_meta_sender_finalized, mut block_meta_reciever_finalized) =
tokio::sync::mpsc::channel::<BlockMeta>(500);
let (block_info_sender_processed, mut block_info_reciever_processed) =
tokio::sync::mpsc::channel::<BlockInfo>(500);
let (block_info_sender_confirmed, mut block_info_reciever_confirmed) =
tokio::sync::mpsc::channel::<BlockInfo>(500);
let (block_info_sender_finalized, mut block_info_reciever_finalized) =
tokio::sync::mpsc::channel::<BlockInfo>(500);
let processed_block_sender = processed_block_sender.clone();
reconnect_attempts += 1;
@ -234,15 +254,22 @@ pub fn create_grpc_multiplex_blocks_subscription(
task_list.extend(processed_blocks_tasks);
// TODO apply same pattern as in create_grpc_multiplex_processed_block_task
let jh_meta_task_confirmed = create_grpc_multiplex_block_meta_task(
let jh_meta_task_processed = create_grpc_multiplex_block_info_task(
&grpc_sources,
block_meta_sender_confirmed.clone(),
block_info_sender_processed.clone(),
CommitmentConfig::processed(),
);
task_list.extend(jh_meta_task_processed);
let jh_meta_task_confirmed = create_grpc_multiplex_block_info_task(
&grpc_sources,
block_info_sender_confirmed.clone(),
CommitmentConfig::confirmed(),
);
task_list.extend(jh_meta_task_confirmed);
let jh_meta_task_finalized = create_grpc_multiplex_block_meta_task(
let jh_meta_task_finalized = create_grpc_multiplex_block_info_task(
&grpc_sources,
block_meta_sender_finalized.clone(),
block_info_sender_finalized.clone(),
CommitmentConfig::finalized(),
);
task_list.extend(jh_meta_task_finalized);
@ -265,10 +292,10 @@ pub fn create_grpc_multiplex_blocks_subscription(
let mut startup_completed = false;
const MAX_ALLOWED_CLEANUP_WITHOUT_RECV: u8 = 12; // 12*5 = 60s without recving data
'recv_loop: loop {
debug!("processed_block_sender: {}, block_meta_sender_confirmed: {}, block_meta_sender_finalized: {}",
debug!("channel capacities: processed_block_sender={}, block_info_sender_confirmed={}, block_info_sender_finalized={}",
processed_block_sender.capacity(),
block_meta_sender_confirmed.capacity(),
block_meta_sender_finalized.capacity()
block_info_sender_confirmed.capacity(),
block_info_sender_finalized.capacity()
);
tokio::select! {
processed_block = processed_block_reciever.recv() => {
@ -277,6 +304,11 @@ pub fn create_grpc_multiplex_blocks_subscription(
let processed_block = processed_block.expect("processed block from stream");
trace!("got processed block {} with blockhash {}",
processed_block.slot, processed_block.blockhash.clone());
if processed_block.commitment_config.is_finalized() {
last_finalized_slot = last_finalized_slot.max(processed_block.slot);
}
if let Err(e) = producedblock_sender.send(processed_block.clone()) {
warn!("produced block channel has no receivers {e:?}");
}
@ -291,14 +323,30 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
}
recent_processed_blocks.insert(processed_block.blockhash, processed_block);
},
meta_confirmed = block_meta_reciever_confirmed.recv() => {
blockinfo_processed = block_info_reciever_processed.recv() => {
let blockinfo_processed = blockinfo_processed.expect("processed block info from stream");
let blockhash = blockinfo_processed.blockhash;
trace!("got processed blockinfo {} with blockhash {}",
blockinfo_processed.slot, blockhash);
if let Err(e) = blockinfo_sender.send(blockinfo_processed) {
warn!("Processed blockinfo channel has no receivers {e:?}");
}
},
blockinfo_confirmed = block_info_reciever_confirmed.recv() => {
cleanup_without_confirmed_recv_blocks_meta = 0;
let meta_confirmed = meta_confirmed.expect("confirmed block meta from stream");
let blockhash = meta_confirmed.blockhash;
let blockinfo_confirmed = blockinfo_confirmed.expect("confirmed block info from stream");
let blockhash = blockinfo_confirmed.blockhash;
trace!("got confirmed blockinfo {} with blockhash {}",
blockinfo_confirmed.slot, blockhash);
if let Err(e) = blockinfo_sender.send(blockinfo_confirmed) {
warn!("Confirmed blockinfo channel has no receivers {e:?}");
}
if let Some(cached_processed_block) = recent_processed_blocks.get(&blockhash) {
let confirmed_block = cached_processed_block.to_confirmed_block();
debug!("got confirmed blockmeta {} with blockhash {}",
debug!("got confirmed blockinfo {} with blockhash {}",
confirmed_block.slot, confirmed_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(confirmed_block) {
warn!("confirmed block channel has no receivers {e:?}");
@ -309,23 +357,29 @@ pub fn create_grpc_multiplex_blocks_subscription(
confirmed_block_not_yet_processed.len(), recent_processed_blocks.len());
}
},
meta_finalized = block_meta_reciever_finalized.recv() => {
blockinfo_finalized = block_info_reciever_finalized.recv() => {
cleanup_without_finalized_recv_blocks_meta = 0;
let meta_finalized = meta_finalized.expect("finalized block meta from stream");
// let _span = debug_span!("sequence_block_meta_finalized", ?meta_finalized.slot).entered();
let blockhash = meta_finalized.blockhash;
let blockinfo_finalized = blockinfo_finalized.expect("finalized block info from stream");
last_finalized_slot = last_finalized_slot.max(blockinfo_finalized.slot);
let blockhash = blockinfo_finalized.blockhash;
trace!("got finalized blockinfo {} with blockhash {}",
blockinfo_finalized.slot, blockhash);
if let Err(e) = blockinfo_sender.send(blockinfo_finalized) {
warn!("Finalized blockinfo channel has no receivers {e:?}");
}
if let Some(cached_processed_block) = recent_processed_blocks.remove(&blockhash) {
let finalized_block = cached_processed_block.to_finalized_block();
last_finalized_slot = finalized_block.slot;
startup_completed = true;
debug!("got finalized blockmeta {} with blockhash {}",
debug!("got finalized blockinfo {} with blockhash {}",
finalized_block.slot, finalized_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(finalized_block) {
warn!("Finalized block channel has no receivers {e:?}");
}
} else if startup_completed {
// this warning is ok for first few blocks when we start lrpc
log::warn!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
log::warn!("finalized blockinfo received for blockhash {} which was never seen or already emitted", blockhash);
finalized_block_not_yet_processed.insert(blockhash);
}
},
@ -334,7 +388,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
if cleanup_without_recv_full_blocks > MAX_ALLOWED_CLEANUP_WITHOUT_RECV ||
cleanup_without_confirmed_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV ||
cleanup_without_finalized_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV {
log::error!("block or block meta geyser stream stopped - restarting multiplexer ({}-{}-{})",
log::error!("block or block info geyser stream stopped - restarting multiplexer ({}-{}-{})",
cleanup_without_recv_full_blocks, cleanup_without_confirmed_recv_blocks_meta, cleanup_without_finalized_recv_blocks_meta,);
// throttle a bit
sleep(Duration::from_millis(1500)).await;
@ -358,7 +412,11 @@ pub fn create_grpc_multiplex_blocks_subscription(
} // -- END reconnect loop
});
(blocks_output_stream, jh_block_emitter_task)
(
blocks_output_stream,
blockinfo_output_stream,
jh_block_emitter_task,
)
}
pub fn create_grpc_multiplex_processed_slots_subscription(
@ -443,30 +501,6 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
(multiplexed_messages_rx, jh_multiplex_task)
}
#[allow(dead_code)]
struct BlockMeta {
pub slot: Slot,
pub blockhash: solana_sdk::hash::Hash,
}
struct BlockMetaExtractor(CommitmentConfig);
impl FromYellowstoneExtractor for BlockMetaExtractor {
type Target = BlockMeta;
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(u64, BlockMeta)> {
match update.update_oneof {
Some(UpdateOneof::BlockMeta(block_meta)) => Some((
block_meta.slot,
BlockMeta {
slot: block_meta.slot,
blockhash: hash_from_str(&block_meta.blockhash).unwrap(),
},
)),
_ => None,
}
}
}
fn map_slot_from_yellowstone_update(update: SubscribeUpdate) -> Option<Slot> {
match update.update_oneof {
Some(UpdateOneof::Slot(update_slot_message)) => Some(update_slot_message.slot),

View File

@ -271,7 +271,7 @@ pub fn create_grpc_subscription(
let (slot_multiplex_channel, jh_multiplex_slotstream) =
create_grpc_multiplex_processed_slots_subscription(grpc_sources.clone());
let (block_multiplex_channel, jh_multiplex_blockstream) =
let (block_multiplex_channel, blockmeta_channel, jh_multiplex_blockstream) =
create_grpc_multiplex_blocks_subscription(grpc_sources.clone());
let cluster_info_polling = poll_cluster_info(rpc_client.clone(), cluster_info_sx);
@ -283,6 +283,7 @@ pub fn create_grpc_subscription(
create_grpc_account_streaming(grpc_sources, accounts_filter);
let streamers = EndpointStreaming {
blocks_notifier: block_multiplex_channel,
blockinfo_notifier: blockmeta_channel,
slot_notifier: slot_multiplex_channel,
cluster_info_notifier,
vote_account_notifier,
@ -300,6 +301,7 @@ pub fn create_grpc_subscription(
} else {
let streamers = EndpointStreaming {
blocks_notifier: block_multiplex_channel,
blockinfo_notifier: blockmeta_channel,
slot_notifier: slot_multiplex_channel,
cluster_info_notifier,
vote_account_notifier,

View File

@ -16,6 +16,7 @@ pub fn create_json_rpc_polling_subscription(
) -> anyhow::Result<(EndpointStreaming, Vec<AnyhowJoinHandle>)> {
let (slot_sx, slot_notifier) = tokio::sync::broadcast::channel(16);
let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(16);
let (blockinfo_sx, blockinfo_notifier) = tokio::sync::broadcast::channel(16);
let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(16);
let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(16);
// does not support accounts support with rpc polling
@ -26,6 +27,7 @@ pub fn create_json_rpc_polling_subscription(
let mut block_polling_tasks = poll_block(
rpc_client.clone(),
block_sx,
blockinfo_sx,
slot_notifier.resubscribe(),
num_parallel_tasks,
);
@ -39,6 +41,7 @@ pub fn create_json_rpc_polling_subscription(
let streamers = EndpointStreaming {
blocks_notifier,
blockinfo_notifier,
slot_notifier,
cluster_info_notifier,
vote_account_notifier,

View File

@ -1,6 +1,7 @@
use anyhow::{bail, Context};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::solana_utils::hash_from_str;
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use solana_lite_rpc_core::structures::produced_block::{ProducedBlockInner, TransactionInfo};
use solana_lite_rpc_core::{
structures::{
@ -54,6 +55,7 @@ pub async fn process_block(
pub fn poll_block(
rpc_client: Arc<RpcClient>,
block_notification_sender: Sender<ProducedBlock>,
blockinfo_notification_sender: Sender<BlockInfo>,
slot_notification: Receiver<SlotNotification>,
num_parallel_tasks: usize,
) -> Vec<AnyhowJoinHandle> {
@ -66,6 +68,7 @@ pub fn poll_block(
for _i in 0..num_parallel_tasks {
let block_notification_sender = block_notification_sender.clone();
let blockinfo_notification_sender = blockinfo_notification_sender.clone();
let rpc_client = rpc_client.clone();
let block_schedule_queue_rx = block_schedule_queue_rx.clone();
let slot_retry_queue_sx = slot_retry_queue_sx.clone();
@ -79,9 +82,13 @@ pub fn poll_block(
process_block(rpc_client.as_ref(), slot, commitment_config).await;
match processed_block {
Some(processed_block) => {
let block_info = map_block_info(&processed_block);
block_notification_sender
.send(processed_block)
.context("Processed block should be sent")?;
blockinfo_notification_sender
.send(block_info)
.context("Processed block info should be sent")?;
// schedule to get finalized commitment
if commitment_config.commitment != CommitmentLevel::Finalized {
let retry_at = tokio::time::Instant::now()
@ -332,6 +339,16 @@ pub fn from_ui_block(
ProducedBlock::new(inner, commitment_config)
}
fn map_block_info(produced_block: &ProducedBlock) -> BlockInfo {
BlockInfo {
slot: produced_block.slot,
block_height: produced_block.block_height,
blockhash: produced_block.blockhash,
commitment_config: produced_block.commitment_config,
block_time: produced_block.block_time,
}
}
#[inline]
fn calc_prioritization_fees(units: u32, additional_fee: u32) -> u64 {
(units as u64 * 1000) / additional_fee as u64

View File

@ -7,6 +7,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::structures::block_info::BlockInfo;
use crate::structures::produced_block::ProducedBlock;
use solana_sdk::hash::Hash;
@ -33,6 +34,17 @@ impl BlockInformation {
block_time: block.block_time,
}
}
pub fn from_block_info(block_info: &BlockInfo) -> Self {
BlockInformation {
slot: block_info.slot,
block_height: block_info.block_height,
last_valid_blockheight: block_info.block_height + MAX_RECENT_BLOCKHASHES as u64,
cleanup_slot: block_info.block_height + 1000,
blockhash: block_info.blockhash,
commitment_config: block_info.commitment_config,
block_time: block_info.block_time,
}
}
}
/// - Block Information Store

View File

@ -0,0 +1,11 @@
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::hash::Hash;
#[derive(Clone, Debug)]
pub struct BlockInfo {
pub slot: u64,
pub block_height: u64,
pub blockhash: Hash,
pub commitment_config: CommitmentConfig,
pub block_time: u64,
}

View File

@ -2,6 +2,7 @@
pub mod account_data;
pub mod account_filter;
pub mod block_info;
pub mod epoch;
pub mod identity_stakes;
pub mod leader_data;

View File

@ -3,13 +3,22 @@ use std::sync::Arc;
use solana_rpc_client_api::response::{RpcContactInfo, RpcVoteAccountStatus};
use tokio::sync::broadcast::Receiver;
use crate::structures::block_info::BlockInfo;
use crate::{
structures::{produced_block::ProducedBlock, slot_notification::SlotNotification},
traits::subscription_sink::SubscriptionSink,
};
// full blocks, commitment level: processed, confirmed, finalized
// note: there is no guarantee about the order
// note: there is no guarantee about the order wrt commitment level
// note: there is no guarantee about the order wrt block vs block meta
pub type BlockStream = Receiver<ProducedBlock>;
// block info (slot, blockhash, etc), commitment level: processed, confirmed, finalized
// note: there is no guarantee about the order wrt commitment level
pub type BlockInfoStream = Receiver<BlockInfo>;
pub type SlotStream = Receiver<SlotNotification>;
pub type VoteAccountStream = Receiver<RpcVoteAccountStatus>;
pub type ClusterInfoStream = Receiver<Vec<RpcContactInfo>>;
pub type SubscptionHanderSink = Arc<dyn SubscriptionSink>;

View File

@ -42,10 +42,9 @@ use solana_lite_rpc_core::structures::account_filter::AccountFilters;
use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule;
use solana_lite_rpc_core::structures::{
epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender,
produced_block::ProducedBlock,
};
use solana_lite_rpc_core::traits::address_lookup_table_interface::AddressLookupTableInterface;
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::types::{BlockInfoStream, BlockStream};
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_lite_rpc_prioritization_fees::account_prio_service::AccountPrioService;
use solana_lite_rpc_services::data_caching_service::DataCachingService;
@ -55,6 +54,7 @@ use solana_lite_rpc_services::transaction_replayer::TransactionReplayer;
use solana_lite_rpc_services::tx_sender::TxSender;
use lite_rpc::postgres_logger;
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use solana_lite_rpc_prioritization_fees::start_block_priofees_task;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
@ -75,27 +75,27 @@ use tracing_subscriber::EnvFilter;
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
async fn get_latest_block(
mut block_stream: BlockStream,
async fn get_latest_block_info(
mut blockinfo_stream: BlockInfoStream,
commitment_config: CommitmentConfig,
) -> ProducedBlock {
) -> BlockInfo {
let started = Instant::now();
loop {
match timeout(Duration::from_millis(500), block_stream.recv()).await {
Ok(Ok(block)) => {
if block.commitment_config == commitment_config {
return block;
match timeout(Duration::from_millis(500), blockinfo_stream.recv()).await {
Ok(Ok(block_info)) => {
if block_info.commitment_config == commitment_config {
return block_info;
}
}
Err(_elapsed) => {
debug!(
"waiting for latest block ({}) ... {:.02}ms",
"waiting for latest block info ({}) ... {:.02}ms",
commitment_config.commitment,
started.elapsed().as_secs_f32() * 1000.0
);
}
Ok(Err(_error)) => {
panic!("Did not recv blocks");
panic!("Did not recv block info");
}
}
}
@ -201,6 +201,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
let EndpointStreaming {
// note: blocks_notifier will be dropped at some point
blocks_notifier,
blockinfo_notifier,
cluster_info_notifier,
slot_notifier,
vote_account_notifier,
@ -235,8 +236,10 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
let account_service = AccountService::new(account_storage, account_notification_sender);
account_service
.process_account_stream(account_stream.resubscribe(), blocks_notifier.resubscribe());
account_service.process_account_stream(
account_stream.resubscribe(),
blockinfo_notifier.resubscribe(),
);
account_service
.populate_from_rpc(
@ -250,21 +253,24 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
None
};
info!("Waiting for first finalized block...");
let finalized_block =
get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await;
info!("Got finalized block: {:?}", finalized_block.slot);
info!("Waiting for first finalized block info...");
let finalized_block_info = get_latest_block_info(
blockinfo_notifier.resubscribe(),
CommitmentConfig::finalized(),
)
.await;
info!("Got finalized block info: {:?}", finalized_block_info.slot);
let (epoch_data, _current_epoch_info) = EpochCache::bootstrap_epoch(&rpc_client).await?;
let block_information_store =
BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
BlockInformationStore::new(BlockInformation::from_block_info(&finalized_block_info));
let data_cache = DataCache {
block_information_store,
cluster_info: ClusterInfo::default(),
identity_stakes: IdentityStakes::new(validator_identity.pubkey()),
slot_cache: SlotCache::new(finalized_block.slot),
slot_cache: SlotCache::new(finalized_block_info.slot),
tx_subs: SubscriptionStore::default(),
txs: TxStore {
store: Arc::new(DashMap::new()),
@ -281,6 +287,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
// to avoid laggin we resubscribe to block notification
let data_caching_service = data_cache_service.listen(
blocks_notifier.resubscribe(),
blockinfo_notifier.resubscribe(),
slot_notifier.resubscribe(),
cluster_info_notifier,
vote_account_notifier,

View File

@ -1,3 +1,4 @@
use solana_lite_rpc_core::types::BlockInfoStream;
use solana_lite_rpc_core::{
stores::data_cache::DataCache,
structures::notifications::NotificationSender,
@ -14,6 +15,7 @@ use solana_lite_rpc_services::{
tx_sender::TxSender,
};
use std::time::Duration;
pub struct ServiceSpawner {
pub prometheus_addr: String,
pub data_cache: DataCache,
@ -38,9 +40,11 @@ impl ServiceSpawner {
}
}
pub async fn spawn_data_caching_service(
// TODO remove
pub async fn _spawn_data_caching_service(
&self,
block_notifier: BlockStream,
blockinfo_notifier: BlockInfoStream,
slot_notification: SlotStream,
cluster_info_notification: ClusterInfoStream,
va_notification: VoteAccountStream,
@ -52,6 +56,7 @@ impl ServiceSpawner {
data_service.listen(
block_notifier,
blockinfo_notifier,
slot_notification,
cluster_info_notification,
va_notification,

View File

@ -7,12 +7,14 @@ use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter};
use solana_lite_rpc_core::stores::{
block_information_store::BlockInformation, data_cache::DataCache,
};
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use solana_lite_rpc_core::types::{BlockStream, ClusterInfoStream, SlotStream, VoteAccountStream};
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::clock::MAX_RECENT_BLOCKHASHES;
use solana_sdk::commitment_config::CommitmentLevel;
use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
lazy_static::lazy_static! {
static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> =
@ -43,13 +45,15 @@ impl DataCachingService {
pub fn listen(
self,
block_notifier: BlockStream,
blockinfo_notifier: Receiver<BlockInfo>,
slot_notification: SlotStream,
cluster_info_notification: ClusterInfoStream,
va_notification: VoteAccountStream,
) -> Vec<AnyhowJoinHandle> {
// clone the ledger to move into the processor task
let data_cache = self.data_cache.clone();
// process all the data into the ledger
let block_information_store_block = data_cache.block_information_store.clone();
let block_information_store_block_info = data_cache.block_information_store.clone();
let block_cache_jh = tokio::spawn(async move {
let mut block_notifier = block_notifier;
loop {
@ -64,8 +68,8 @@ impl DataCachingService {
}
};
data_cache
.block_information_store
// note: most likely the block has been added from blockinfo_notifier stream already
block_information_store_block
.add_block(BlockInformation::from_block(&block))
.await;
@ -76,9 +80,8 @@ impl DataCachingService {
};
for tx in &block.transactions {
let block_info = data_cache
.block_information_store
.get_block_info(&tx.recent_blockhash);
let block_info =
block_information_store_block.get_block_info(&tx.recent_blockhash);
let last_valid_blockheight = if let Some(block_info) = block_info {
block_info.last_valid_blockheight
} else {
@ -118,6 +121,26 @@ impl DataCachingService {
}
});
let blockinfo_cache_jh = tokio::spawn(async move {
let mut blockinfo_notifier = blockinfo_notifier;
loop {
let block_info = match blockinfo_notifier.recv().await {
Ok(block_info) => block_info,
Err(RecvError::Lagged(blockinfo_lagged)) => {
warn!("Lagged {} block info - continue", blockinfo_lagged);
continue;
}
Err(RecvError::Closed) => {
bail!("BlockInfo stream has been closed - abort");
}
};
block_information_store_block_info
.add_block(BlockInformation::from_block_info(&block_info))
.await;
}
});
let data_cache = self.data_cache.clone();
let slot_cache_jh = tokio::spawn(async move {
let mut slot_notification = slot_notification;
@ -174,6 +197,7 @@ impl DataCachingService {
vec![
slot_cache_jh,
block_cache_jh,
blockinfo_cache_jh,
cluster_info_jh,
identity_stakes_jh,
cleaning_service,