From 7cf50e60fc9a69bd6986f3c91f7fea44dd52a29c Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 3 May 2023 18:50:00 +0800 Subject: [PATCH] Fixed missing Root notifications via geyser plugin framework (#31180) * Fixed missing Root notifications via geyser plugin framework * Renamed a variable * fmt issue * Do not try the loop if no subscribers. * Addressing some feedback -- passing parent roots from replay_stage to avoid race conditions * clippy issue * Address some reviewing findings * Addressed some feedback from Carl * fix a clippy issue * Added comments on optimistically_confirmed_bank_tracker module to explain the workflow * Addressed Trent's review --- core/src/replay_stage.rs | 32 ++- core/src/tvu.rs | 4 +- core/src/validator.rs | 10 +- .../src/geyser_plugin_service.rs | 6 +- .../src/slot_status_observer.rs | 16 +- .../optimistically_confirmed_bank_tracker.rs | 217 +++++++++++++++--- rpc/src/rpc.rs | 5 + rpc/src/rpc_subscriptions.rs | 12 + 8 files changed, 252 insertions(+), 50 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index f8239fa36f..a846271af7 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -50,7 +50,7 @@ use { solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, solana_program_runtime::timings::ExecuteTimings, solana_rpc::{ - optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, + optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig}, rpc_subscriptions::RpcSubscriptions, }, solana_rpc_client_api::response::SlotUpdate, @@ -237,7 +237,7 @@ pub struct ReplayStageConfig { pub transaction_status_sender: Option, pub rewards_recorder_sender: Option, pub cache_block_meta_sender: Option, - pub bank_notification_sender: Option, + pub bank_notification_sender: Option, pub wait_for_vote_to_start_leader: bool, pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender, pub tower_storage: Arc, @@ -1996,7 +1996,7 @@ impl ReplayStage { rpc_subscriptions: &Arc, block_commitment_cache: &Arc>, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, - bank_notification_sender: &Option, + bank_notification_sender: &Option, duplicate_slots_tracker: &mut DuplicateSlotsTracker, gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots, unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes, @@ -2022,8 +2022,19 @@ impl ReplayStage { .get(new_root) .expect("Root bank doesn't exist"); let mut rooted_banks = root_bank.parents(); + let oldest_parent = rooted_banks.last().map(|last| last.parent_slot()); rooted_banks.push(root_bank.clone()); let rooted_slots: Vec<_> = rooted_banks.iter().map(|bank| bank.slot()).collect(); + // The following differs from rooted_slots by including the parent slot of the oldest parent bank. + let rooted_slots_with_parents = bank_notification_sender + .as_ref() + .map_or(false, |sender| sender.should_send_parents) + .then(|| { + let mut new_chain = rooted_slots.clone(); + new_chain.push(oldest_parent.unwrap_or(bank.parent_slot())); + new_chain + }); + // Call leader schedule_cache.set_root() before blockstore.set_root() because // bank_forks.root is consumed by repair_service to update gossip, so we don't want to // get shreds for repair on gossip before we update leader schedule, otherwise they may @@ -2059,8 +2070,16 @@ impl ReplayStage { rpc_subscriptions.notify_roots(rooted_slots); if let Some(sender) = bank_notification_sender { sender - .send(BankNotification::Root(root_bank)) + .sender + .send(BankNotification::NewRootBank(root_bank)) .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); + + if let Some(new_chain) = rooted_slots_with_parents { + sender + .sender + .send(BankNotification::NewRootedChain(new_chain)) + .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); + } } latest_root_senders.iter().for_each(|s| { if let Err(e) = s.send(new_root) { @@ -2574,7 +2593,7 @@ impl ReplayStage { transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, - bank_notification_sender: &Option, + bank_notification_sender: &Option, rewards_recorder_sender: &Option, rpc_subscriptions: &Arc, duplicate_slots_tracker: &mut DuplicateSlotsTracker, @@ -2698,6 +2717,7 @@ impl ReplayStage { ); if let Some(sender) = bank_notification_sender { sender + .sender .send(BankNotification::Frozen(bank.clone())) .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); } @@ -2766,7 +2786,7 @@ impl ReplayStage { verify_recyclers: &VerifyRecyclers, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, replay_vote_sender: &ReplayVoteSender, - bank_notification_sender: &Option, + bank_notification_sender: &Option, rewards_recorder_sender: &Option, rpc_subscriptions: &Arc, duplicate_slots_tracker: &mut DuplicateSlotsTracker, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 2f9f1387c2..cc0f531584 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -41,7 +41,7 @@ use { }, solana_poh::poh_recorder::PohRecorder, solana_rpc::{ - max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSender, + max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSenderConfig, rpc_subscriptions::RpcSubscriptions, }, solana_runtime::{ @@ -126,7 +126,7 @@ impl Tvu { verified_vote_receiver: VerifiedVoteReceiver, replay_vote_sender: ReplayVoteSender, completed_data_sets_sender: CompletedDataSetsSender, - bank_notification_sender: Option, + bank_notification_sender: Option, gossip_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver, tvu_config: TvuConfig, max_slots: &Arc, diff --git a/core/src/validator.rs b/core/src/validator.rs index f41ff1a554..8b8ec7c85a 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -63,7 +63,8 @@ use { solana_rpc::{ max_slots::MaxSlots, optimistically_confirmed_bank_tracker::{ - OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker, + BankNotificationSenderConfig, OptimisticallyConfirmedBank, + OptimisticallyConfirmedBankTracker, }, rpc::JsonRpcConfig, rpc_completed_slots_service::RpcCompletedSlotsService, @@ -945,7 +946,10 @@ impl Validator { rpc_subscriptions.clone(), confirmed_bank_subscribers, )), - Some(bank_notification_sender), + Some(BankNotificationSenderConfig { + sender: bank_notification_sender, + should_send_parents: geyser_plugin_service.is_some(), + }), ) } else { (None, None, None, None) @@ -1145,7 +1149,7 @@ impl Validator { gossip_verified_vote_hash_sender, replay_vote_receiver, replay_vote_sender, - bank_notification_sender, + bank_notification_sender.map(|sender| sender.sender), config.tpu_coalesce, cluster_confirmed_slot_sender, &connection_cache, diff --git a/geyser-plugin-manager/src/geyser_plugin_service.rs b/geyser-plugin-manager/src/geyser_plugin_service.rs index 808f52c079..ece7bb1c63 100644 --- a/geyser-plugin-manager/src/geyser_plugin_service.rs +++ b/geyser-plugin-manager/src/geyser_plugin_service.rs @@ -13,7 +13,7 @@ use { log::*, solana_rpc::{ entry_notifier_interface::EntryNotifierLock, - optimistically_confirmed_bank_tracker::BankNotification, + optimistically_confirmed_bank_tracker::SlotNotification, transaction_notifier_interface::TransactionNotifierLock, }, solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier, @@ -50,14 +50,14 @@ impl GeyserPluginService { /// The rest of the JSON fields' definition is up to to the concrete plugin implementation /// It is usually used to configure the connection information for the external data store. pub fn new( - confirmed_bank_receiver: Receiver, + confirmed_bank_receiver: Receiver, geyser_plugin_config_files: &[PathBuf], ) -> Result { Self::new_with_receiver(confirmed_bank_receiver, geyser_plugin_config_files, None) } pub fn new_with_receiver( - confirmed_bank_receiver: Receiver, + confirmed_bank_receiver: Receiver, geyser_plugin_config_files: &[PathBuf], rpc_to_plugin_manager_receiver_and_exit: Option<( Receiver, diff --git a/geyser-plugin-manager/src/slot_status_observer.rs b/geyser-plugin-manager/src/slot_status_observer.rs index b2f6bf5f79..7eba6e54eb 100644 --- a/geyser-plugin-manager/src/slot_status_observer.rs +++ b/geyser-plugin-manager/src/slot_status_observer.rs @@ -1,7 +1,7 @@ use { crate::slot_status_notifier::SlotStatusNotifier, crossbeam_channel::Receiver, - solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, + solana_rpc::optimistically_confirmed_bank_tracker::SlotNotification, std::{ sync::{ atomic::{AtomicBool, Ordering}, @@ -19,7 +19,7 @@ pub(crate) struct SlotStatusObserver { impl SlotStatusObserver { pub fn new( - bank_notification_receiver: Receiver, + bank_notification_receiver: Receiver, slot_status_notifier: SlotStatusNotifier, ) -> Self { let exit_updated_slot_server = Arc::new(AtomicBool::new(false)); @@ -43,7 +43,7 @@ impl SlotStatusObserver { } fn run_bank_notification_receiver( - bank_notification_receiver: Receiver, + bank_notification_receiver: Receiver, exit: Arc, slot_status_notifier: SlotStatusNotifier, ) -> JoinHandle<()> { @@ -53,23 +53,23 @@ impl SlotStatusObserver { while !exit.load(Ordering::Relaxed) { if let Ok(slot) = bank_notification_receiver.recv() { match slot { - BankNotification::OptimisticallyConfirmed(slot) => { + SlotNotification::OptimisticallyConfirmed(slot) => { slot_status_notifier .read() .unwrap() .notify_slot_confirmed(slot, None); } - BankNotification::Frozen(bank) => { + SlotNotification::Frozen((slot, parent)) => { slot_status_notifier .read() .unwrap() - .notify_slot_processed(bank.slot(), Some(bank.parent_slot())); + .notify_slot_processed(slot, Some(parent)); } - BankNotification::Root(bank) => { + SlotNotification::Root((slot, parent)) => { slot_status_notifier .read() .unwrap() - .notify_slot_rooted(bank.slot(), Some(bank.parent_slot())); + .notify_slot_rooted(slot, Some(parent)); } } } diff --git a/rpc/src/optimistically_confirmed_bank_tracker.rs b/rpc/src/optimistically_confirmed_bank_tracker.rs index 9693999d23..dcc3792a22 100644 --- a/rpc/src/optimistically_confirmed_bank_tracker.rs +++ b/rpc/src/optimistically_confirmed_bank_tracker.rs @@ -1,6 +1,12 @@ //! The `optimistically_confirmed_bank_tracker` module implements a threaded service to track the //! most recent optimistically confirmed bank for use in rpc services, and triggers gossip -//! subscription notifications +//! subscription notifications. +//! This module also supports notifying of slot status for subscribers using the SlotNotificationSender. +//! It receives the BankNotification events, transforms them into SlotNotification and sends them via +//! SlotNotificationSender in the following way: +//! BankNotification::OptimisticallyConfirmed --> SlotNotification::OptimisticallyConfirmed +//! BankNotification::Frozen --> SlotNotification::Frozen +//! BankNotification::NewRootedChain --> SlotNotification::Root for the roots in the chain. use { crate::rpc_subscriptions::RpcSubscriptions, @@ -35,7 +41,18 @@ impl OptimisticallyConfirmedBank { pub enum BankNotification { OptimisticallyConfirmed(Slot), Frozen(Arc), - Root(Arc), + NewRootBank(Arc), + /// The newly rooted slot chain including the parent slot of the oldest bank in the rooted chain. + NewRootedChain(Vec), +} + +#[derive(Clone, Debug)] +pub enum SlotNotification { + OptimisticallyConfirmed(Slot), + /// The (Slot, Parent Slot) pair for the slot frozen + Frozen((Slot, Slot)), + /// The (Slot, Parent Slot) pair for the root slot + Root((Slot, Slot)), } impl std::fmt::Debug for BankNotification { @@ -45,7 +62,8 @@ impl std::fmt::Debug for BankNotification { write!(f, "OptimisticallyConfirmed({slot:?})") } BankNotification::Frozen(bank) => write!(f, "Frozen({})", bank.slot()), - BankNotification::Root(bank) => write!(f, "Root({})", bank.slot()), + BankNotification::NewRootBank(bank) => write!(f, "Root({})", bank.slot()), + BankNotification::NewRootedChain(chain) => write!(f, "RootedChain({chain:?})"), } } } @@ -53,6 +71,15 @@ impl std::fmt::Debug for BankNotification { pub type BankNotificationReceiver = Receiver; pub type BankNotificationSender = Sender; +#[derive(Clone)] +pub struct BankNotificationSenderConfig { + pub sender: BankNotificationSender, + pub should_send_parents: bool, +} + +pub type SlotNotificationReceiver = Receiver; +pub type SlotNotificationSender = Sender; + pub struct OptimisticallyConfirmedBankTracker { thread_hdl: JoinHandle<()>, } @@ -64,12 +91,13 @@ impl OptimisticallyConfirmedBankTracker { bank_forks: Arc>, optimistically_confirmed_bank: Arc>, subscriptions: Arc, - bank_notification_subscribers: Option>>>, + slot_notification_subscribers: Option>>>, ) -> Self { let exit_ = exit.clone(); let mut pending_optimistically_confirmed_banks = HashSet::new(); let mut last_notified_confirmed_slot: Slot = 0; let mut highest_confirmed_slot: Slot = 0; + let mut newest_root_slot: Slot = 0; let thread_hdl = Builder::new() .name("solOpConfBnkTrk".to_string()) .spawn(move || loop { @@ -85,7 +113,8 @@ impl OptimisticallyConfirmedBankTracker { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, - &bank_notification_subscribers, + &mut newest_root_slot, + &slot_notification_subscribers, ) { break; } @@ -102,7 +131,8 @@ impl OptimisticallyConfirmedBankTracker { pending_optimistically_confirmed_banks: &mut HashSet, last_notified_confirmed_slot: &mut Slot, highest_confirmed_slot: &mut Slot, - bank_notification_subscribers: &Option>>>, + newest_root_slot: &mut Slot, + slot_notification_subscribers: &Option>>>, ) -> Result<(), RecvTimeoutError> { let notification = receiver.recv_timeout(Duration::from_secs(1))?; Self::process_notification( @@ -113,17 +143,18 @@ impl OptimisticallyConfirmedBankTracker { pending_optimistically_confirmed_banks, last_notified_confirmed_slot, highest_confirmed_slot, - bank_notification_subscribers, + newest_root_slot, + slot_notification_subscribers, ); Ok(()) } fn notify_slot_status( - bank_notification_subscribers: &Option>>>, - notification: BankNotification, + slot_notification_subscribers: &Option>>>, + notification: SlotNotification, ) { - if let Some(bank_notification_subscribers) = bank_notification_subscribers { - for sender in bank_notification_subscribers.read().unwrap().iter() { + if let Some(slot_notification_subscribers) = slot_notification_subscribers { + for sender in slot_notification_subscribers.read().unwrap().iter() { match sender.send(notification.clone()) { Ok(_) => {} Err(err) => { @@ -143,7 +174,7 @@ impl OptimisticallyConfirmedBankTracker { bank: &Arc, last_notified_confirmed_slot: &mut Slot, pending_optimistically_confirmed_banks: &mut HashSet, - bank_notification_subscribers: &Option>>>, + slot_notification_subscribers: &Option>>>, ) { if bank.is_frozen() { if bank.slot() > *last_notified_confirmed_slot { @@ -154,8 +185,8 @@ impl OptimisticallyConfirmedBankTracker { subscriptions.notify_gossip_subscribers(bank.slot()); *last_notified_confirmed_slot = bank.slot(); Self::notify_slot_status( - bank_notification_subscribers, - BankNotification::OptimisticallyConfirmed(bank.slot()), + slot_notification_subscribers, + SlotNotification::OptimisticallyConfirmed(bank.slot()), ); } } else if bank.slot() > bank_forks.read().unwrap().root_bank().slot() { @@ -171,7 +202,7 @@ impl OptimisticallyConfirmedBankTracker { slot_threshold: Slot, last_notified_confirmed_slot: &mut Slot, pending_optimistically_confirmed_banks: &mut HashSet, - bank_notification_subscribers: &Option>>>, + slot_notification_subscribers: &Option>>>, ) { for confirmed_bank in bank.clone().parents_inclusive().iter().rev() { if confirmed_bank.slot() > slot_threshold { @@ -185,12 +216,40 @@ impl OptimisticallyConfirmedBankTracker { confirmed_bank, last_notified_confirmed_slot, pending_optimistically_confirmed_banks, - bank_notification_subscribers, + slot_notification_subscribers, ); } } } + fn notify_new_root_slots( + roots: &mut Vec, + newest_root_slot: &mut Slot, + slot_notification_subscribers: &Option>>>, + ) { + if slot_notification_subscribers.is_none() { + return; + } + roots.sort_unstable(); + // The chain are sorted already and must contain at least the parent of a newly rooted slot as the first element + assert!(roots.len() >= 2); + for i in 1..roots.len() { + let root = roots[i]; + if root > *newest_root_slot { + let parent = roots[i - 1]; + debug!( + "Doing SlotNotification::Root for root {}, parent: {}", + root, parent + ); + Self::notify_slot_status( + slot_notification_subscribers, + SlotNotification::Root((root, parent)), + ); + *newest_root_slot = root; + } + } + } + pub fn process_notification( notification: BankNotification, bank_forks: &Arc>, @@ -199,7 +258,8 @@ impl OptimisticallyConfirmedBankTracker { pending_optimistically_confirmed_banks: &mut HashSet, last_notified_confirmed_slot: &mut Slot, highest_confirmed_slot: &mut Slot, - bank_notification_subscribers: &Option>>>, + newest_root_slot: &mut Slot, + slot_notification_subscribers: &Option>>>, ) { debug!("received bank notification: {:?}", notification); match notification { @@ -222,7 +282,7 @@ impl OptimisticallyConfirmedBankTracker { *highest_confirmed_slot, last_notified_confirmed_slot, pending_optimistically_confirmed_banks, - bank_notification_subscribers, + slot_notification_subscribers, ); *highest_confirmed_slot = slot; @@ -258,8 +318,8 @@ impl OptimisticallyConfirmedBankTracker { }); Self::notify_slot_status( - bank_notification_subscribers, - BankNotification::Frozen(bank.clone()), + slot_notification_subscribers, + SlotNotification::Frozen((bank.slot(), bank.parent_slot())), ); } @@ -276,7 +336,7 @@ impl OptimisticallyConfirmedBankTracker { *last_notified_confirmed_slot, last_notified_confirmed_slot, pending_optimistically_confirmed_banks, - bank_notification_subscribers, + slot_notification_subscribers, ); let mut w_optimistically_confirmed_bank = @@ -287,11 +347,7 @@ impl OptimisticallyConfirmedBankTracker { drop(w_optimistically_confirmed_bank); } } - BankNotification::Root(bank) => { - Self::notify_slot_status( - bank_notification_subscribers, - BankNotification::Root(bank.clone()), - ); + BankNotification::NewRootBank(bank) => { let root_slot = bank.slot(); let mut w_optimistically_confirmed_bank = optimistically_confirmed_bank.write().unwrap(); @@ -299,8 +355,16 @@ impl OptimisticallyConfirmedBankTracker { w_optimistically_confirmed_bank.bank = bank; } drop(w_optimistically_confirmed_bank); + pending_optimistically_confirmed_banks.retain(|&s| s > root_slot); } + BankNotification::NewRootedChain(mut roots) => { + Self::notify_new_root_slots( + &mut roots, + newest_root_slot, + slot_notification_subscribers, + ); + } } } @@ -317,6 +381,7 @@ impl OptimisticallyConfirmedBankTracker { mod tests { use { super::*, + crossbeam_channel::unbounded, solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, solana_runtime::{ accounts_background_service::AbsRequestSender, commitment::BlockCommitmentCache, @@ -325,6 +390,26 @@ mod tests { std::sync::atomic::AtomicU64, }; + /// Given a bank get the parent slot chains, this include the parent slot of the oldest parent bank. + fn get_parent_chains(bank: &Arc) -> Vec { + let mut parents = bank.parents(); + let oldest_parent = parents.last().map(|last| last.parent_slot()); + parents.push(bank.clone()); + let mut rooted_slots: Vec<_> = parents.iter().map(|bank| bank.slot()).collect(); + rooted_slots.push(oldest_parent.unwrap_or(bank.parent_slot())); + rooted_slots + } + + /// Receive the Root notifications from the channel, if no item received within 100 ms, break and return all + /// of those received. + fn get_root_notifications(receiver: &Receiver) -> Vec { + let mut notifications = Vec::new(); + while let Ok(notification) = receiver.recv_timeout(Duration::from_millis(100)) { + notifications.push(notification); + } + notifications + } + #[test] fn test_process_notification() { let exit = Arc::new(AtomicBool::new(false)); @@ -360,6 +445,8 @@ mod tests { assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 0); let mut highest_confirmed_slot: Slot = 0; + let mut newest_root_slot: Slot = 0; + let mut last_notified_confirmed_slot: Slot = 0; OptimisticallyConfirmedBankTracker::process_notification( BankNotification::OptimisticallyConfirmed(2), @@ -369,6 +456,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut newest_root_slot, &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); @@ -383,6 +471,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut newest_root_slot, &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); @@ -397,6 +486,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut newest_root_slot, &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); @@ -416,6 +506,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut newest_root_slot, &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); @@ -434,6 +525,7 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut newest_root_slot, &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); @@ -445,20 +537,48 @@ mod tests { let bank5 = Bank::new_from_parent(&bank4, &Pubkey::default(), 5); bank_forks.write().unwrap().insert(bank5); let bank5 = bank_forks.read().unwrap().get(5).unwrap(); + + let mut bank_notification_senders = Vec::new(); + let (sender, receiver) = unbounded(); + bank_notification_senders.push(sender); + + let subscribers = Some(Arc::new(RwLock::new(bank_notification_senders))); + let parent_roots = get_parent_chains(&bank5); OptimisticallyConfirmedBankTracker::process_notification( - BankNotification::Root(bank5), + BankNotification::NewRootBank(bank5), &bank_forks, &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, - &None, + &mut newest_root_slot, + &subscribers, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); assert_eq!(pending_optimistically_confirmed_banks.len(), 0); assert!(!pending_optimistically_confirmed_banks.contains(&4)); assert_eq!(highest_confirmed_slot, 4); + // The newest_root_slot is updated via NewRootedChain only + assert_eq!(newest_root_slot, 0); + + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::NewRootedChain(parent_roots), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, + &mut newest_root_slot, + &subscribers, + ); + + assert_eq!(newest_root_slot, 5); + + // Obtain the root notifications, we expect 5, including that for bank5. + let notifications = get_root_notifications(&receiver); + assert_eq!(notifications.len(), 5); // Banks <= root do not get added to pending list, even if not frozen let bank5 = bank_forks.read().unwrap().get(5).unwrap(); @@ -479,11 +599,52 @@ mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut newest_root_slot, &None, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); assert_eq!(pending_optimistically_confirmed_banks.len(), 0); assert!(!pending_optimistically_confirmed_banks.contains(&6)); assert_eq!(highest_confirmed_slot, 4); + assert_eq!(newest_root_slot, 5); + + let bank7 = bank_forks.read().unwrap().get(7).unwrap(); + + let parent_roots = get_parent_chains(&bank7); + + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::NewRootBank(bank7), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, + &mut newest_root_slot, + &subscribers, + ); + assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 7); + assert_eq!(pending_optimistically_confirmed_banks.len(), 0); + assert!(!pending_optimistically_confirmed_banks.contains(&6)); + assert_eq!(highest_confirmed_slot, 4); + assert_eq!(newest_root_slot, 5); + + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::NewRootedChain(parent_roots), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, + &mut newest_root_slot, + &subscribers, + ); + + assert_eq!(newest_root_slot, 7); + + // Obtain the root notifications, we expect 1, which is for bank7 only as its parent bank5 is already notified. + let notifications = get_root_notifications(&receiver); + assert_eq!(notifications.len(), 1); } } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 69a746327f..9aae8a3188 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -8340,6 +8340,7 @@ pub mod tests { let slot: Slot = serde_json::from_value(json["result"].clone()).unwrap(); assert_eq!(slot, 0); let mut highest_confirmed_slot: Slot = 0; + let mut highest_root_slot: Slot = 0; let mut last_notified_confirmed_slot: Slot = 0; OptimisticallyConfirmedBankTracker::process_notification( @@ -8350,6 +8351,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut highest_root_slot, &None, ); let req = @@ -8368,6 +8370,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut highest_root_slot, &None, ); let req = @@ -8386,6 +8389,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut highest_root_slot, &None, ); let req = @@ -8405,6 +8409,7 @@ pub mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut highest_root_slot, &None, ); let req = diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 7eca2ea8b5..f471cbd832 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -2036,6 +2036,7 @@ pub(crate) mod tests { })); let mut highest_confirmed_slot: Slot = 0; + let mut highest_root_slot: Slot = 0; let mut last_notified_confirmed_slot: Slot = 0; // Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is unfrozen, we expect // to see transaction for alice and bob to be notified in order. @@ -2047,6 +2048,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut highest_root_slot, &None, ); @@ -2098,6 +2100,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut highest_root_slot, &None, ); @@ -2206,6 +2209,7 @@ pub(crate) mod tests { })); let mut highest_confirmed_slot: Slot = 0; + let mut highest_root_slot: Slot = 0; let mut last_notified_confirmed_slot: Slot = 0; // Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is not in the bankforks, we do not // expect to see any RPC notifications. @@ -2217,6 +2221,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut highest_root_slot, &None, ); @@ -2320,6 +2325,7 @@ pub(crate) mod tests { })); let mut highest_confirmed_slot: Slot = 0; + let mut highest_root_slot: Slot = 0; let mut last_notified_confirmed_slot: Slot = 0; // Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is not in the bankforks, we expect // to see transaction for alice and bob to be notified only when bank3 is added to the fork and @@ -2332,6 +2338,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut highest_root_slot, &None, ); @@ -2385,6 +2392,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut highest_root_slot, &None, ); @@ -2810,6 +2818,7 @@ pub(crate) mod tests { // First, notify the unfrozen bank first to queue pending notification let mut highest_confirmed_slot: Slot = 0; + let mut highest_root_slot: Slot = 0; let mut last_notified_confirmed_slot: Slot = 0; OptimisticallyConfirmedBankTracker::process_notification( BankNotification::OptimisticallyConfirmed(2), @@ -2819,6 +2828,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut highest_root_slot, &None, ); @@ -2832,6 +2842,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut highest_root_slot, &None, ); @@ -2885,6 +2896,7 @@ pub(crate) mod tests { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, + &mut highest_root_slot, &None, ); let response = receiver1.recv();