From 3f70ddb2c580518eaee87329629319d192446370 Mon Sep 17 00:00:00 2001 From: Tyera Date: Wed, 10 May 2023 17:20:51 -0600 Subject: [PATCH] Add entry notification service for geyser (#31290) * Move entry_notifier_interface * Add EntryNotifierService * Use descriptive struct in sender/receiver * Optionally initialize EntryNotifierService in validator * Plumb EntryNotfierSender into Tvu, blockstore_processor * Plumb EntryNotfierSender into Tpu * Only return one option when constructing EntryNotifierService --- Cargo.lock | 1 + core/src/replay_stage.rs | 15 ++++ core/src/tpu.rs | 6 +- core/src/tvu.rs | 5 +- core/src/validator.rs | 45 ++++++++++- geyser-plugin-manager/Cargo.toml | 1 + geyser-plugin-manager/src/entry_notifier.rs | 2 +- .../src/geyser_plugin_service.rs | 2 +- ledger-tool/src/ledger_utils.rs | 2 + ledger/src/bank_forks_utils.rs | 6 ++ ledger/src/blockstore_processor.rs | 31 +++++++- .../src/entry_notifier_interface.rs | 0 ledger/src/entry_notifier_service.rs | 78 +++++++++++++++++++ ledger/src/lib.rs | 2 + local-cluster/tests/local_cluster.rs | 1 + programs/sbf/Cargo.lock | 1 + rpc/src/lib.rs | 1 - 17 files changed, 188 insertions(+), 11 deletions(-) rename {rpc => ledger}/src/entry_notifier_interface.rs (100%) create mode 100644 ledger/src/entry_notifier_service.rs diff --git a/Cargo.lock b/Cargo.lock index f2bb37c51a..08a6ad16d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5815,6 +5815,7 @@ dependencies = [ "serde_json", "solana-entry", "solana-geyser-plugin-interface", + "solana-ledger", "solana-measure", "solana-metrics", "solana-rpc", diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 335b12acfe..f202cfa26d 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -42,6 +42,7 @@ use { blockstore_processor::{ self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender, }, + entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache, leader_schedule_utils::first_of_consecutive_leader_slots, }, @@ -236,6 +237,7 @@ pub struct ReplayStageConfig { pub transaction_status_sender: Option, pub rewards_recorder_sender: Option, pub cache_block_meta_sender: Option, + pub entry_notification_sender: Option, pub bank_notification_sender: Option, pub wait_for_vote_to_start_leader: bool, pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender, @@ -501,6 +503,7 @@ impl ReplayStage { transaction_status_sender, rewards_recorder_sender, cache_block_meta_sender, + entry_notification_sender, bank_notification_sender, wait_for_vote_to_start_leader, ancestor_hashes_replay_update_sender, @@ -596,6 +599,7 @@ impl ReplayStage { &mut progress, transaction_status_sender.as_ref(), cache_block_meta_sender.as_ref(), + entry_notification_sender.as_ref(), &verify_recyclers, &mut heaviest_subtree_fork_choice, &replay_vote_sender, @@ -1869,12 +1873,14 @@ impl ReplayStage { } } + #[allow(clippy::too_many_arguments)] fn replay_blockstore_into_bank( bank: &Arc, blockstore: &Blockstore, replay_stats: &RwLock, replay_progress: &RwLock, transaction_status_sender: Option<&TransactionStatusSender>, + entry_notification_sender: Option<&EntryNotifierSender>, replay_vote_sender: &ReplayVoteSender, verify_recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, @@ -1893,6 +1899,7 @@ impl ReplayStage { &mut w_replay_progress, false, transaction_status_sender, + entry_notification_sender, Some(replay_vote_sender), verify_recyclers, false, @@ -2412,6 +2419,7 @@ impl ReplayStage { vote_account: &Pubkey, progress: &mut ProgressMap, transaction_status_sender: Option<&TransactionStatusSender>, + entry_notification_sender: Option<&EntryNotifierSender>, verify_recyclers: &VerifyRecyclers, replay_vote_sender: &ReplayVoteSender, replay_timing: &mut ReplayTiming, @@ -2490,6 +2498,7 @@ impl ReplayStage { &replay_stats, &replay_progress, transaction_status_sender, + entry_notification_sender, &replay_vote_sender.clone(), &verify_recyclers.clone(), log_messages_bytes_limit, @@ -2519,6 +2528,7 @@ impl ReplayStage { vote_account: &Pubkey, progress: &mut ProgressMap, transaction_status_sender: Option<&TransactionStatusSender>, + entry_notification_sender: Option<&EntryNotifierSender>, verify_recyclers: &VerifyRecyclers, replay_vote_sender: &ReplayVoteSender, replay_timing: &mut ReplayTiming, @@ -2571,6 +2581,7 @@ impl ReplayStage { &bank_progress.replay_stats, &bank_progress.replay_progress, transaction_status_sender, + entry_notification_sender, &replay_vote_sender.clone(), &verify_recyclers.clone(), log_messages_bytes_limit, @@ -2781,6 +2792,7 @@ impl ReplayStage { progress: &mut ProgressMap, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, verify_recyclers: &VerifyRecyclers, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, replay_vote_sender: &ReplayVoteSender, @@ -2819,6 +2831,7 @@ impl ReplayStage { vote_account, progress, transaction_status_sender, + entry_notification_sender, verify_recyclers, replay_vote_sender, replay_timing, @@ -2837,6 +2850,7 @@ impl ReplayStage { vote_account, progress, transaction_status_sender, + entry_notification_sender, verify_recyclers, replay_vote_sender, replay_timing, @@ -4437,6 +4451,7 @@ pub(crate) mod tests { &bank1_progress.replay_stats, &bank1_progress.replay_progress, None, + None, &replay_vote_sender, &VerifyRecyclers::default(), None, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index f19340a27f..dd65e99078 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -19,7 +19,10 @@ use { crossbeam_channel::{unbounded, Receiver}, solana_client::connection_cache::ConnectionCache, solana_gossip::cluster_info::ClusterInfo, - solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender}, + solana_ledger::{ + blockstore::Blockstore, blockstore_processor::TransactionStatusSender, + entry_notifier_service::EntryNotifierSender, + }, solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry}, solana_rpc::{ optimistically_confirmed_bank_tracker::BankNotificationSender, @@ -80,6 +83,7 @@ impl Tpu { sockets: TpuSockets, subscriptions: &Arc, transaction_status_sender: Option, + _entry_notification_sender: Option, blockstore: &Arc, broadcast_type: &BroadcastStageType, exit: &Arc, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 552ce77ec6..4ba16d39d2 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -37,7 +37,7 @@ use { }, solana_ledger::{ blockstore::Blockstore, blockstore_processor::TransactionStatusSender, - leader_schedule_cache::LeaderScheduleCache, + entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache, }, solana_poh::poh_recorder::PohRecorder, solana_rpc::{ @@ -120,6 +120,7 @@ impl Tvu { transaction_status_sender: Option, rewards_recorder_sender: Option, cache_block_meta_sender: Option, + entry_notification_sender: Option, vote_tracker: Arc, retransmit_slots_sender: RetransmitSlotsSender, gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver, @@ -243,6 +244,7 @@ impl Tvu { transaction_status_sender, rewards_recorder_sender, cache_block_meta_sender, + entry_notification_sender, bank_notification_sender, wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, ancestor_hashes_replay_update_sender, @@ -460,6 +462,7 @@ pub mod tests { None, None, None, + None, Arc::::default(), retransmit_slots_sender, gossip_verified_vote_hash_receiver, diff --git a/core/src/validator.rs b/core/src/validator.rs index 220d040d1b..b3a3e35d99 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -51,6 +51,8 @@ use { }, blockstore_options::{BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions}, blockstore_processor::{self, TransactionStatusSender}, + entry_notifier_interface::EntryNotifierLock, + entry_notifier_service::{EntryNotifierSender, EntryNotifierService}, leader_schedule::FixedSchedule, leader_schedule_cache::LeaderScheduleCache, }, @@ -422,6 +424,7 @@ pub struct Validator { transaction_status_service: Option, rewards_recorder_service: Option, cache_block_meta_service: Option, + entry_notifier_service: Option, system_monitor_service: Option, sample_performance_service: Option, poh_timing_report_service: PohTimingReportService, @@ -584,14 +587,21 @@ impl Validator { .as_ref() .and_then(|geyser_plugin_service| geyser_plugin_service.get_transaction_notifier()); + let entry_notifier = geyser_plugin_service + .as_ref() + .and_then(|geyser_plugin_service| geyser_plugin_service.get_entry_notifier()); + let block_metadata_notifier = geyser_plugin_service .as_ref() .and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier()); info!( - "Geyser plugin: accounts_update_notifier: {} transaction_notifier: {}", + "Geyser plugin: accounts_update_notifier: {}, \ + transaction_notifier: {}, \ + entry_notifier: {}", accounts_update_notifier.is_some(), - transaction_notifier.is_some() + transaction_notifier.is_some(), + entry_notifier.is_some() ); let system_monitor_service = Some(SystemMonitorService::new( @@ -630,6 +640,7 @@ impl Validator { blockstore_process_options, blockstore_root_scan, pruned_banks_receiver, + entry_notifier_service, ) = load_blockstore( config, ledger_path, @@ -637,6 +648,7 @@ impl Validator { &start_progress, accounts_update_notifier, transaction_notifier, + entry_notifier, Some(poh_timing_point_sender.clone()), )?; @@ -751,6 +763,9 @@ impl Validator { ); let leader_schedule_cache = Arc::new(leader_schedule_cache); + let entry_notification_sender = entry_notifier_service + .as_ref() + .map(|service| service.sender()); let mut process_blockstore = ProcessBlockStore::new( &id, vote_account, @@ -762,6 +777,7 @@ impl Validator { &blockstore_process_options, transaction_status_sender.as_ref(), cache_block_meta_sender.clone(), + entry_notification_sender, blockstore_root_scan, accounts_background_request_sender.clone(), config, @@ -1074,6 +1090,9 @@ impl Validator { info!("Disabled banking tracer"); } + let entry_notification_sender = entry_notifier_service + .as_ref() + .map(|service| service.sender_cloned()); let (replay_vote_sender, replay_vote_receiver) = unbounded(); let tvu = Tvu::new( vote_account, @@ -1100,6 +1119,7 @@ impl Validator { transaction_status_sender.clone(), rewards_recorder_sender, cache_block_meta_sender, + entry_notification_sender.clone(), vote_tracker.clone(), retransmit_slots_sender, gossip_verified_vote_hash_receiver, @@ -1141,6 +1161,7 @@ impl Validator { }, &rpc_subscriptions, transaction_status_sender, + entry_notification_sender, &blockstore, &config.broadcast_stage_type, &exit, @@ -1183,6 +1204,7 @@ impl Validator { transaction_status_service, rewards_recorder_service, cache_block_meta_service, + entry_notifier_service, system_monitor_service, sample_performance_service, poh_timing_report_service, @@ -1299,6 +1321,12 @@ impl Validator { .expect("sample_performance_service"); } + if let Some(entry_notifier_service) = self.entry_notifier_service { + entry_notifier_service + .join() + .expect("entry_notifier_service"); + } + if let Some(s) = self.snapshot_packager_service { s.join().expect("snapshot_packager_service"); } @@ -1488,6 +1516,7 @@ fn load_blockstore( start_progress: &Arc>, accounts_update_notifier: Option, transaction_notifier: Option, + entry_notifier: Option, poh_timing_point_sender: Option, ) -> Result< ( @@ -1503,6 +1532,7 @@ fn load_blockstore( blockstore_processor::ProcessOptions, BlockstoreRootScan, DroppedSlotsReceiver, + Option, ), String, > { @@ -1580,6 +1610,9 @@ fn load_blockstore( TransactionHistoryServices::default() }; + let entry_notifier_service = + entry_notifier.map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit)); + let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) = bank_forks_utils::load_bank_forks( &genesis_config, @@ -1591,6 +1624,9 @@ fn load_blockstore( transaction_history_services .cache_block_meta_sender .as_ref(), + entry_notifier_service + .as_ref() + .map(|service| service.sender()), accounts_update_notifier, exit, ); @@ -1643,6 +1679,7 @@ fn load_blockstore( process_options, blockstore_root_scan, pruned_banks_receiver, + entry_notifier_service, )) } @@ -1657,6 +1694,7 @@ pub struct ProcessBlockStore<'a> { process_options: &'a blockstore_processor::ProcessOptions, transaction_status_sender: Option<&'a TransactionStatusSender>, cache_block_meta_sender: Option, + entry_notification_sender: Option<&'a EntryNotifierSender>, blockstore_root_scan: Option, accounts_background_request_sender: AbsRequestSender, config: &'a ValidatorConfig, @@ -1676,6 +1714,7 @@ impl<'a> ProcessBlockStore<'a> { process_options: &'a blockstore_processor::ProcessOptions, transaction_status_sender: Option<&'a TransactionStatusSender>, cache_block_meta_sender: Option, + entry_notification_sender: Option<&'a EntryNotifierSender>, blockstore_root_scan: BlockstoreRootScan, accounts_background_request_sender: AbsRequestSender, config: &'a ValidatorConfig, @@ -1691,6 +1730,7 @@ impl<'a> ProcessBlockStore<'a> { process_options, transaction_status_sender, cache_block_meta_sender, + entry_notification_sender, blockstore_root_scan: Some(blockstore_root_scan), accounts_background_request_sender, config, @@ -1728,6 +1768,7 @@ impl<'a> ProcessBlockStore<'a> { self.process_options, self.transaction_status_sender, self.cache_block_meta_sender.as_ref(), + self.entry_notification_sender, &self.accounts_background_request_sender, ) { exit.store(true, Ordering::Relaxed); diff --git a/geyser-plugin-manager/Cargo.toml b/geyser-plugin-manager/Cargo.toml index 34dbe998db..bf0d61a636 100644 --- a/geyser-plugin-manager/Cargo.toml +++ b/geyser-plugin-manager/Cargo.toml @@ -21,6 +21,7 @@ log = { workspace = true } serde_json = { workspace = true } solana-entry = { workspace = true } solana-geyser-plugin-interface = { workspace = true } +solana-ledger = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } solana-rpc = { workspace = true } diff --git a/geyser-plugin-manager/src/entry_notifier.rs b/geyser-plugin-manager/src/entry_notifier.rs index 26a2db2382..ce6c3239c0 100644 --- a/geyser-plugin-manager/src/entry_notifier.rs +++ b/geyser-plugin-manager/src/entry_notifier.rs @@ -6,9 +6,9 @@ use { solana_geyser_plugin_interface::geyser_plugin_interface::{ ReplicaEntryInfo, ReplicaEntryInfoVersions, }, + solana_ledger::entry_notifier_interface::EntryNotifier, solana_measure::measure::Measure, solana_metrics::*, - solana_rpc::entry_notifier_interface::EntryNotifier, solana_sdk::clock::Slot, std::sync::{Arc, RwLock}, }; diff --git a/geyser-plugin-manager/src/geyser_plugin_service.rs b/geyser-plugin-manager/src/geyser_plugin_service.rs index ece7bb1c63..18babc7d47 100644 --- a/geyser-plugin-manager/src/geyser_plugin_service.rs +++ b/geyser-plugin-manager/src/geyser_plugin_service.rs @@ -11,8 +11,8 @@ use { }, crossbeam_channel::Receiver, log::*, + solana_ledger::entry_notifier_interface::EntryNotifierLock, solana_rpc::{ - entry_notifier_interface::EntryNotifierLock, optimistically_confirmed_bank_tracker::SlotNotification, transaction_notifier_interface::TransactionNotifierLock, }, diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index 7b37380244..d29bcb0dbe 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -235,6 +235,7 @@ pub fn load_bank_forks( snapshot_config.as_ref(), &process_options, None, + None, // Maybe support this later, though accounts_update_notifier, &Arc::default(), ); @@ -322,6 +323,7 @@ pub fn load_bank_forks( &process_options, transaction_status_sender.as_ref(), None, + None, // Maybe support this later, though &accounts_background_request_sender, ) .map(|_| (bank_forks, starting_snapshot_hashes)); diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 896a8baad9..ebf48c864d 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -5,6 +5,7 @@ use { self, BlockstoreProcessorError, CacheBlockMetaSender, ProcessOptions, TransactionStatusSender, }, + entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache, }, log::*, @@ -49,6 +50,7 @@ pub fn load( process_options: ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, accounts_update_notifier: Option, exit: &Arc, ) -> LoadResult { @@ -60,6 +62,7 @@ pub fn load( snapshot_config, &process_options, cache_block_meta_sender, + entry_notification_sender, accounts_update_notifier, exit, ); @@ -71,6 +74,7 @@ pub fn load( &process_options, transaction_status_sender, cache_block_meta_sender, + entry_notification_sender, &AbsRequestSender::default(), ) .map(|_| (bank_forks, leader_schedule_cache, starting_snapshot_hashes)) @@ -85,6 +89,7 @@ pub fn load_bank_forks( snapshot_config: Option<&SnapshotConfig>, process_options: &ProcessOptions, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, accounts_update_notifier: Option, exit: &Arc, ) -> ( @@ -145,6 +150,7 @@ pub fn load_bank_forks( account_paths, process_options, cache_block_meta_sender, + entry_notification_sender, accounts_update_notifier, exit, ); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 247d8a1a40..71eabe62b6 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1,8 +1,8 @@ use { crate::{ block_error::BlockError, blockstore::Blockstore, blockstore_db::BlockstoreError, - blockstore_meta::SlotMeta, leader_schedule_cache::LeaderScheduleCache, - token_balances::collect_token_balances, + blockstore_meta::SlotMeta, entry_notifier_service::EntryNotifierSender, + leader_schedule_cache::LeaderScheduleCache, token_balances::collect_token_balances, }, chrono_humanize::{Accuracy, HumanTime, Tense}, crossbeam_channel::Sender, @@ -677,6 +677,7 @@ pub fn test_process_blockstore( opts, None, None, + None, exit, ); @@ -687,6 +688,7 @@ pub fn test_process_blockstore( opts, None, None, + None, &abs_request_sender, ) .unwrap(); @@ -703,6 +705,7 @@ pub(crate) fn process_blockstore_for_bank_0( account_paths: Vec, opts: &ProcessOptions, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, accounts_update_notifier: Option, exit: &Arc, ) -> Arc> { @@ -729,6 +732,7 @@ pub(crate) fn process_blockstore_for_bank_0( opts, &VerifyRecyclers::default(), cache_block_meta_sender, + entry_notification_sender, ); bank_forks } @@ -742,6 +746,7 @@ pub fn process_blockstore_from_root( opts: &ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, accounts_background_request_sender: &AbsRequestSender, ) -> result::Result<(), BlockstoreProcessorError> { let (start_slot, start_slot_hash) = { @@ -791,6 +796,7 @@ pub fn process_blockstore_from_root( opts, transaction_status_sender, cache_block_meta_sender, + entry_notification_sender, &mut timing, accounts_background_request_sender, )? @@ -896,6 +902,7 @@ fn confirm_full_slot( recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, transaction_status_sender: Option<&TransactionStatusSender>, + entry_notification_sender: Option<&EntryNotifierSender>, replay_vote_sender: Option<&ReplayVoteSender>, timing: &mut ExecuteTimings, ) -> result::Result<(), BlockstoreProcessorError> { @@ -910,6 +917,7 @@ fn confirm_full_slot( progress, skip_verification, transaction_status_sender, + entry_notification_sender, replay_vote_sender, recyclers, opts.allow_dead_slots, @@ -1055,6 +1063,7 @@ pub fn confirm_slot( progress: &mut ConfirmationProgress, skip_verification: bool, transaction_status_sender: Option<&TransactionStatusSender>, + entry_notification_sender: Option<&EntryNotifierSender>, replay_vote_sender: Option<&ReplayVoteSender>, recyclers: &VerifyRecyclers, allow_dead_slots: bool, @@ -1084,6 +1093,7 @@ pub fn confirm_slot( progress, skip_verification, transaction_status_sender, + entry_notification_sender, replay_vote_sender, recyclers, log_messages_bytes_limit, @@ -1099,6 +1109,7 @@ fn confirm_slot_entries( progress: &mut ConfirmationProgress, skip_verification: bool, transaction_status_sender: Option<&TransactionStatusSender>, + _entry_notification_sender: Option<&EntryNotifierSender>, replay_vote_sender: Option<&ReplayVoteSender>, recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, @@ -1281,6 +1292,7 @@ fn process_bank_0( opts: &ProcessOptions, recyclers: &VerifyRecyclers, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, ) { assert_eq!(bank0.slot(), 0); let mut progress = ConfirmationProgress::new(bank0.last_blockhash()); @@ -1291,6 +1303,7 @@ fn process_bank_0( recyclers, &mut progress, None, + entry_notification_sender, None, &mut ExecuteTimings::default(), ) @@ -1371,6 +1384,7 @@ fn load_frozen_forks( opts: &ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, timing: &mut ExecuteTimings, accounts_background_request_sender: &AbsRequestSender, ) -> result::Result<(u64, usize), BlockstoreProcessorError> { @@ -1458,6 +1472,7 @@ fn load_frozen_forks( &mut progress, transaction_status_sender, cache_block_meta_sender, + entry_notification_sender, None, timing, ) @@ -1648,6 +1663,7 @@ fn supermajority_root_from_vote_accounts( // Processes and replays the contents of a single slot, returns Error // if failed to play the slot +#[allow(clippy::too_many_arguments)] fn process_single_slot( blockstore: &Blockstore, bank: &Arc, @@ -1656,6 +1672,7 @@ fn process_single_slot( progress: &mut ConfirmationProgress, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, replay_vote_sender: Option<&ReplayVoteSender>, timing: &mut ExecuteTimings, ) -> result::Result<(), BlockstoreProcessorError> { @@ -1668,6 +1685,7 @@ fn process_single_slot( recyclers, progress, transaction_status_sender, + entry_notification_sender, replay_vote_sender, timing, ) @@ -3384,7 +3402,7 @@ pub mod tests { vec![entry_1, tick, entry_2.clone()], true, None, - None + None, ), Ok(()) ); @@ -3562,7 +3580,7 @@ pub mod tests { ..ProcessOptions::default() }; let recyclers = VerifyRecyclers::default(); - process_bank_0(&bank0, &blockstore, &opts, &recyclers, None); + process_bank_0(&bank0, &blockstore, &opts, &recyclers, None, None); let bank1 = bank_forks.insert(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); confirm_full_slot( &blockstore, @@ -3572,6 +3590,7 @@ pub mod tests { &mut ConfirmationProgress::new(bank0.last_blockhash()), None, None, + None, &mut ExecuteTimings::default(), ) .unwrap(); @@ -3592,6 +3611,7 @@ pub mod tests { &opts, None, None, + None, &AbsRequestSender::default(), ) .unwrap(); @@ -4214,6 +4234,7 @@ pub mod tests { false, None, None, + None, &VerifyRecyclers::default(), None, &PrioritizationFeeCache::new(0u64), @@ -4357,6 +4378,7 @@ pub mod tests { false, Some(&transaction_status_sender), None, + None, &VerifyRecyclers::default(), None, &PrioritizationFeeCache::new(0u64), @@ -4402,6 +4424,7 @@ pub mod tests { false, Some(&transaction_status_sender), None, + None, &VerifyRecyclers::default(), None, &PrioritizationFeeCache::new(0u64), diff --git a/rpc/src/entry_notifier_interface.rs b/ledger/src/entry_notifier_interface.rs similarity index 100% rename from rpc/src/entry_notifier_interface.rs rename to ledger/src/entry_notifier_interface.rs diff --git a/ledger/src/entry_notifier_service.rs b/ledger/src/entry_notifier_service.rs new file mode 100644 index 0000000000..b07f3ce1a2 --- /dev/null +++ b/ledger/src/entry_notifier_service.rs @@ -0,0 +1,78 @@ +use { + crate::entry_notifier_interface::EntryNotifierLock, + crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, + solana_entry::entry::EntrySummary, + solana_sdk::clock::Slot, + std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, + }, +}; + +pub struct EntryNotification { + pub slot: Slot, + pub index: usize, + pub entry: EntrySummary, +} + +pub type EntryNotifierSender = Sender; +pub type EntryNotifierReceiver = Receiver; + +pub struct EntryNotifierService { + sender: EntryNotifierSender, + thread_hdl: JoinHandle<()>, +} + +impl EntryNotifierService { + pub fn new(entry_notifier: EntryNotifierLock, exit: &Arc) -> Self { + let exit = exit.clone(); + let (entry_notification_sender, entry_notification_receiver) = unbounded(); + let thread_hdl = Builder::new() + .name("solEntryNotif".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + + if let Err(RecvTimeoutError::Disconnected) = + Self::notify_entry(&entry_notification_receiver, entry_notifier.clone()) + { + break; + } + }) + .unwrap(); + Self { + sender: entry_notification_sender, + thread_hdl, + } + } + + fn notify_entry( + entry_notification_receiver: &EntryNotifierReceiver, + entry_notifier: EntryNotifierLock, + ) -> Result<(), RecvTimeoutError> { + let EntryNotification { slot, index, entry } = + entry_notification_receiver.recv_timeout(Duration::from_secs(1))?; + entry_notifier + .write() + .unwrap() + .notify_entry(slot, index, &entry); + Ok(()) + } + + pub fn sender(&self) -> &EntryNotifierSender { + &self.sender + } + + pub fn sender_cloned(&self) -> EntryNotifierSender { + self.sender.clone() + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 98b677f60a..274bd6d369 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -14,6 +14,8 @@ pub mod blockstore_meta; pub mod blockstore_metrics; pub mod blockstore_options; pub mod blockstore_processor; +pub mod entry_notifier_interface; +pub mod entry_notifier_service; pub mod genesis_utils; pub mod leader_schedule; pub mod leader_schedule_cache; diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index ab3bd7457b..28e6f327d9 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -2183,6 +2183,7 @@ fn create_snapshot_to_hard_fork( None, None, None, + None, &Arc::default(), ) .unwrap(); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index e7d119f207..129fd844f7 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5009,6 +5009,7 @@ dependencies = [ "serde_json", "solana-entry", "solana-geyser-plugin-interface", + "solana-ledger", "solana-measure", "solana-metrics", "solana-rpc", diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 7693976587..d021155e89 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -1,6 +1,5 @@ #![allow(clippy::integer_arithmetic)] mod cluster_tpu_info; -pub mod entry_notifier_interface; pub mod max_slots; pub mod optimistically_confirmed_bank_tracker; pub mod parsed_token_accounts;