//! The `pubsub` module implements a threaded subscription service on client RPC request use crate::{ optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, rpc::{get_parsed_token_account, get_parsed_token_accounts}, }; use core::hash::Hash; use jsonrpc_pubsub::{ typed::{Sink, Subscriber}, SubscriptionId, }; use serde::Serialize; use solana_account_decoder::{parse_token::spl_token_id_v2_0, UiAccount, UiAccountEncoding}; use solana_client::{ rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, rpc_filter::RpcFilterType, rpc_response::{ ProcessedSignatureResult, ReceivedSignatureResult, Response, RpcKeyedAccount, RpcLogsResponse, RpcResponseContext, RpcSignatureResult, SlotInfo, }, }; use solana_measure::measure::Measure; use solana_runtime::{ bank::{ Bank, TransactionLogCollectorConfig, TransactionLogCollectorFilter, TransactionLogInfo, }, bank_forks::BankForks, commitment::{BlockCommitmentCache, CommitmentSlots}, }; use solana_sdk::{ account::Account, clock::{Slot, UnixTimestamp}, commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature, transaction, }; use solana_vote_program::vote_state::Vote; use std::{ collections::{HashMap, HashSet}, iter, sync::{ atomic::{AtomicBool, Ordering}, mpsc::{Receiver, RecvTimeoutError, SendError, Sender}, }, sync::{Arc, Mutex, RwLock}, thread::{Builder, JoinHandle}, time::Duration, }; const RECEIVE_DELAY_MILLIS: u64 = 100; trait BankGetTransactionLogsAdapter { fn get_transaction_logs_adapter( &self, stuff: &(Option, bool), ) -> Option>; } impl BankGetTransactionLogsAdapter for Bank { fn get_transaction_logs_adapter( &self, config: &(Option, bool), ) -> Option> { let mut logs = self.get_transaction_logs(config.0.as_ref()); if config.0.is_none() && !config.1 { // Filter out votes if the subscriber doesn't want them logs = logs.map(|logs| logs.into_iter().filter(|log| !log.is_vote).collect()); } logs } } // A more human-friendly version of Vote, with the bank state signature base58 encoded. #[derive(Serialize, Deserialize, Debug)] pub struct RpcVote { pub slots: Vec, pub hash: String, pub timestamp: Option, } enum NotificationEntry { Slot(SlotInfo), Vote(Vote), Root(Slot), Bank(CommitmentSlots), Gossip(Slot), SignaturesReceived((Slot, Vec)), } impl std::fmt::Debug for NotificationEntry { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { NotificationEntry::Root(root) => write!(f, "Root({})", root), NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote), NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), NotificationEntry::Bank(commitment_slots) => { write!(f, "Bank({{slot: {:?}}})", commitment_slots.slot) } NotificationEntry::SignaturesReceived(slot_signatures) => { write!(f, "SignaturesReceived({:?})", slot_signatures) } NotificationEntry::Gossip(slot) => write!(f, "Gossip({:?})", slot), } } } struct SubscriptionData { sink: Sink, commitment: CommitmentConfig, last_notified_slot: RwLock, config: Option, } #[derive(Default, Clone)] struct ProgramConfig { filters: Vec, encoding: Option, } type RpcAccountSubscriptions = RwLock< HashMap< Pubkey, HashMap, UiAccountEncoding>>, >, >; type RpcLogsSubscriptions = RwLock< HashMap< (Option, bool), HashMap, ()>>, >, >; type RpcProgramSubscriptions = RwLock< HashMap< Pubkey, HashMap, ProgramConfig>>, >, >; type RpcSignatureSubscriptions = RwLock< HashMap< Signature, HashMap, bool>>, >, >; type RpcSlotSubscriptions = RwLock>>; type RpcVoteSubscriptions = RwLock>>; type RpcRootSubscriptions = RwLock>>; fn add_subscription( subscriptions: &mut HashMap>>, hashmap_key: K, commitment: CommitmentConfig, sub_id: SubscriptionId, subscriber: Subscriber, last_notified_slot: Slot, config: Option, ) where K: Eq + Hash, S: Clone, { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let subscription_data = SubscriptionData { sink, commitment, last_notified_slot: RwLock::new(last_notified_slot), config, }; subscriptions .entry(hashmap_key) .or_default() .insert(sub_id, subscription_data); } fn remove_subscription( subscriptions: &mut HashMap>>, sub_id: &SubscriptionId, ) -> bool where K: Eq + Hash, S: Clone, { let mut found = false; subscriptions.retain(|_, v| { v.retain(|k, _| { let retain = k != sub_id; if !retain { found = true; } retain }); !v.is_empty() }); found } #[allow(clippy::type_complexity)] fn check_commitment_and_notify( subscriptions: &HashMap, T>>>, hashmap_key: &K, bank_forks: &Arc>, commitment_slots: &CommitmentSlots, bank_method: B, filter_results: F, notifier: &RpcNotifier, ) -> HashSet where K: Eq + Hash + Clone + Copy, S: Clone + Serialize, B: Fn(&Bank, &K) -> X, F: Fn(X, &K, Slot, Option, Arc) -> (Box>, Slot), X: Clone + Default, T: Clone, { let mut notified_set: HashSet = HashSet::new(); if let Some(hashmap) = subscriptions.get(hashmap_key) { for ( sub_id, SubscriptionData { sink, commitment, last_notified_slot, config, }, ) in hashmap.iter() { let slot = if commitment.is_finalized() { commitment_slots.highest_confirmed_root } else if commitment.is_confirmed() { commitment_slots.highest_confirmed_slot } else { commitment_slots.slot }; if let Some(bank) = bank_forks.read().unwrap().get(slot).cloned() { let results = bank_method(&bank, hashmap_key); let mut w_last_notified_slot = last_notified_slot.write().unwrap(); let (filter_results, result_slot) = filter_results( results, hashmap_key, *w_last_notified_slot, config.as_ref().cloned(), bank, ); for result in filter_results { notifier.notify( Response { context: RpcResponseContext { slot }, value: result, }, sink, ); notified_set.insert(sub_id.clone()); *w_last_notified_slot = result_slot; } } } } notified_set } struct RpcNotifier; impl RpcNotifier { fn notify(&self, value: T, sink: &Sink) where T: serde::Serialize, { let _ = sink.notify(Ok(value)); } } fn filter_account_result( result: Option<(Account, Slot)>, pubkey: &Pubkey, last_notified_slot: Slot, encoding: Option, bank: Arc, ) -> (Box>, Slot) { // If the account is not found, `last_modified_slot` will default to zero and // we will notify clients that the account no longer exists if we haven't already let (account, last_modified_slot) = result.unwrap_or_default(); // If last_modified_slot < last_notified_slot this means that we last notified for a fork // and should notify that the account state has been reverted. let results: Box> = if last_modified_slot != last_notified_slot { let encoding = encoding.unwrap_or(UiAccountEncoding::Binary); if account.owner == spl_token_id_v2_0() && encoding == UiAccountEncoding::JsonParsed { Box::new(iter::once(get_parsed_token_account(bank, pubkey, account))) } else { Box::new(iter::once(UiAccount::encode( pubkey, account, encoding, None, None, ))) } } else { Box::new(iter::empty()) }; (results, last_modified_slot) } fn filter_signature_result( result: Option>, _signature: &Signature, last_notified_slot: Slot, _config: Option, _bank: Arc, ) -> (Box>, Slot) { ( Box::new(result.into_iter().map(|result| { RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: result.err() }) })), last_notified_slot, ) } fn filter_program_results( accounts: Vec<(Pubkey, Account)>, program_id: &Pubkey, last_notified_slot: Slot, config: Option, bank: Arc, ) -> (Box>, Slot) { let config = config.unwrap_or_default(); let encoding = config.encoding.unwrap_or(UiAccountEncoding::Binary); let filters = config.filters; let accounts_is_empty = accounts.is_empty(); let keyed_accounts = accounts.into_iter().filter(move |(_, account)| { filters.iter().all(|filter_type| match filter_type { RpcFilterType::DataSize(size) => account.data.len() as u64 == *size, RpcFilterType::Memcmp(compare) => compare.bytes_match(&account.data), }) }); let accounts: Box> = if program_id == &spl_token_id_v2_0() && encoding == UiAccountEncoding::JsonParsed && !accounts_is_empty { Box::new(get_parsed_token_accounts(bank, keyed_accounts)) } else { Box::new( keyed_accounts.map(move |(pubkey, account)| RpcKeyedAccount { pubkey: pubkey.to_string(), account: UiAccount::encode(&pubkey, account, encoding.clone(), None, None), }), ) }; (accounts, last_notified_slot) } fn filter_logs_results( logs: Option>, _address: &(Option, bool), last_notified_slot: Slot, _config: Option<()>, _bank: Arc, ) -> (Box>, Slot) { match logs { None => (Box::new(iter::empty()), last_notified_slot), Some(logs) => ( Box::new(logs.into_iter().map(|log| RpcLogsResponse { signature: log.signature.to_string(), err: log.result.err(), logs: log.log_messages, })), last_notified_slot, ), } } fn total_nested_subscriptions( subscription_map: &RwLock>>, ) -> usize { subscription_map .read() .unwrap() .iter() .fold(0, |acc, x| acc + x.1.len()) } #[derive(Clone)] struct Subscriptions { account_subscriptions: Arc, program_subscriptions: Arc, logs_subscriptions: Arc, signature_subscriptions: Arc, gossip_account_subscriptions: Arc, gossip_logs_subscriptions: Arc, gossip_program_subscriptions: Arc, gossip_signature_subscriptions: Arc, slot_subscriptions: Arc, vote_subscriptions: Arc, root_subscriptions: Arc, } impl Subscriptions { fn total(&self) -> usize { let mut total = 0; total += total_nested_subscriptions(&self.account_subscriptions); total += total_nested_subscriptions(&self.program_subscriptions); total += total_nested_subscriptions(&self.logs_subscriptions); total += total_nested_subscriptions(&self.signature_subscriptions); total += total_nested_subscriptions(&self.gossip_account_subscriptions); total += total_nested_subscriptions(&self.gossip_logs_subscriptions); total += total_nested_subscriptions(&self.gossip_program_subscriptions); total += total_nested_subscriptions(&self.gossip_signature_subscriptions); total += self.slot_subscriptions.read().unwrap().len(); total += self.vote_subscriptions.read().unwrap().len(); total += self.root_subscriptions.read().unwrap().len(); total } } pub struct RpcSubscriptions { subscriptions: Subscriptions, notification_sender: Arc>>, t_cleanup: Option>, bank_forks: Arc>, block_commitment_cache: Arc>, optimistically_confirmed_bank: Arc>, exit: Arc, enable_vote_subscription: bool, } impl Drop for RpcSubscriptions { fn drop(&mut self) { self.shutdown().unwrap_or_else(|err| { warn!("RPC Notification - shutdown error: {:?}", err); }); } } impl RpcSubscriptions { pub fn new( exit: &Arc, bank_forks: Arc>, block_commitment_cache: Arc>, optimistically_confirmed_bank: Arc>, ) -> Self { Self::new_with_vote_subscription( exit, bank_forks, block_commitment_cache, optimistically_confirmed_bank, false, ) } pub fn new_with_vote_subscription( exit: &Arc, bank_forks: Arc>, block_commitment_cache: Arc>, optimistically_confirmed_bank: Arc>, enable_vote_subscription: bool, ) -> Self { let (notification_sender, notification_receiver): ( Sender, Receiver, ) = std::sync::mpsc::channel(); let account_subscriptions = Arc::new(RpcAccountSubscriptions::default()); let logs_subscriptions = Arc::new(RpcLogsSubscriptions::default()); let program_subscriptions = Arc::new(RpcProgramSubscriptions::default()); let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default()); let gossip_account_subscriptions = Arc::new(RpcAccountSubscriptions::default()); let gossip_logs_subscriptions = Arc::new(RpcLogsSubscriptions::default()); let gossip_program_subscriptions = Arc::new(RpcProgramSubscriptions::default()); let gossip_signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default()); let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default()); let vote_subscriptions = Arc::new(RpcVoteSubscriptions::default()); let root_subscriptions = Arc::new(RpcRootSubscriptions::default()); let notification_sender = Arc::new(Mutex::new(notification_sender)); let _bank_forks = bank_forks.clone(); let _block_commitment_cache = block_commitment_cache.clone(); let exit_clone = exit.clone(); let subscriptions = Subscriptions { account_subscriptions, logs_subscriptions, program_subscriptions, signature_subscriptions, gossip_account_subscriptions, gossip_logs_subscriptions, gossip_program_subscriptions, gossip_signature_subscriptions, slot_subscriptions, vote_subscriptions, root_subscriptions, }; let _subscriptions = subscriptions.clone(); let notifier = RpcNotifier {}; let t_cleanup = Builder::new() .name("solana-rpc-notifications".to_string()) .spawn(move || { Self::process_notifications( exit_clone, notifier, notification_receiver, _subscriptions, _bank_forks, ); }) .unwrap(); Self { subscriptions, notification_sender, t_cleanup: Some(t_cleanup), bank_forks, block_commitment_cache, optimistically_confirmed_bank, exit: exit.clone(), enable_vote_subscription, } } // For tests only... pub fn default_with_bank_forks(bank_forks: Arc>) -> Self { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); Self::new_with_vote_subscription( &Arc::new(AtomicBool::new(false)), bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), optimistically_confirmed_bank, true, ) } fn check_account( pubkey: &Pubkey, bank_forks: &Arc>, account_subscriptions: Arc, notifier: &RpcNotifier, commitment_slots: &CommitmentSlots, ) -> HashSet { let subscriptions = account_subscriptions.read().unwrap(); check_commitment_and_notify( &subscriptions, pubkey, bank_forks, commitment_slots, Bank::get_account_modified_slot, filter_account_result, notifier, ) } fn check_logs( address_with_enable_votes_flag: &(Option, bool), bank_forks: &Arc>, logs_subscriptions: Arc, notifier: &RpcNotifier, commitment_slots: &CommitmentSlots, ) -> HashSet { let subscriptions = logs_subscriptions.read().unwrap(); check_commitment_and_notify( &subscriptions, address_with_enable_votes_flag, bank_forks, commitment_slots, Bank::get_transaction_logs_adapter, filter_logs_results, notifier, ) } fn check_program( program_id: &Pubkey, bank_forks: &Arc>, program_subscriptions: Arc, notifier: &RpcNotifier, commitment_slots: &CommitmentSlots, ) -> HashSet { let subscriptions = program_subscriptions.read().unwrap(); check_commitment_and_notify( &subscriptions, program_id, bank_forks, commitment_slots, Bank::get_program_accounts_modified_since_parent, filter_program_results, notifier, ) } fn check_signature( signature: &Signature, bank_forks: &Arc>, signature_subscriptions: Arc, notifier: &RpcNotifier, commitment_slots: &CommitmentSlots, ) -> HashSet { let mut subscriptions = signature_subscriptions.write().unwrap(); let notified_ids = check_commitment_and_notify( &subscriptions, signature, bank_forks, commitment_slots, Bank::get_signature_status_processed_since_parent, filter_signature_result, notifier, ); if let Some(subscription_ids) = subscriptions.get_mut(signature) { subscription_ids.retain(|k, _| !notified_ids.contains(k)); if subscription_ids.is_empty() { subscriptions.remove(&signature); } } notified_ids } pub fn total(&self) -> usize { self.subscriptions.total() } pub fn add_account_subscription( &self, pubkey: Pubkey, config: Option, sub_id: SubscriptionId, subscriber: Subscriber>, ) { let config = config.unwrap_or_default(); let commitment = config .commitment .unwrap_or_else(CommitmentConfig::confirmed); let slot = if commitment.is_finalized() { self.block_commitment_cache .read() .unwrap() .highest_confirmed_root() } else if commitment.is_confirmed() { self.optimistically_confirmed_bank .read() .unwrap() .bank .slot() } else { self.block_commitment_cache.read().unwrap().slot() }; let last_notified_slot = if let Some((_account, slot)) = self .bank_forks .read() .unwrap() .get(slot) .and_then(|bank| bank.get_account_modified_slot(&pubkey)) { slot } else { 0 }; let mut subscriptions = if commitment.is_confirmed() { self.subscriptions .gossip_account_subscriptions .write() .unwrap() } else { self.subscriptions.account_subscriptions.write().unwrap() }; add_subscription( &mut subscriptions, pubkey, commitment, sub_id, subscriber, last_notified_slot, config.encoding, ); } pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.subscriptions.account_subscriptions.write().unwrap(); if remove_subscription(&mut subscriptions, id) { true } else { let mut subscriptions = self .subscriptions .gossip_account_subscriptions .write() .unwrap(); remove_subscription(&mut subscriptions, id) } } pub fn add_program_subscription( &self, program_id: Pubkey, config: Option, sub_id: SubscriptionId, subscriber: Subscriber>, ) { let config = config.unwrap_or_default(); let commitment = config .account_config .commitment .unwrap_or_else(CommitmentConfig::confirmed); let mut subscriptions = if commitment.is_confirmed() { self.subscriptions .gossip_program_subscriptions .write() .unwrap() } else { self.subscriptions.program_subscriptions.write().unwrap() }; add_subscription( &mut subscriptions, program_id, commitment, sub_id, subscriber, 0, // last_notified_slot is not utilized for program subscriptions Some(ProgramConfig { filters: config.filters.unwrap_or_default(), encoding: config.account_config.encoding, }), ); } pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.subscriptions.program_subscriptions.write().unwrap(); if remove_subscription(&mut subscriptions, id) { true } else { let mut subscriptions = self .subscriptions .gossip_program_subscriptions .write() .unwrap(); remove_subscription(&mut subscriptions, id) } } pub fn add_logs_subscription( &self, address: Option, include_votes: bool, commitment: Option, sub_id: SubscriptionId, subscriber: Subscriber>, ) { let commitment = commitment.unwrap_or_else(CommitmentConfig::confirmed); { let mut subscriptions = if commitment.is_confirmed() { self.subscriptions .gossip_logs_subscriptions .write() .unwrap() } else { self.subscriptions.logs_subscriptions.write().unwrap() }; add_subscription( &mut subscriptions, (address, include_votes), commitment, sub_id, subscriber, 0, // last_notified_slot is not utilized for logs subscriptions None, ); } self.update_bank_transaction_log_keys(); } pub fn remove_logs_subscription(&self, id: &SubscriptionId) -> bool { let mut removed = { let mut subscriptions = self.subscriptions.logs_subscriptions.write().unwrap(); remove_subscription(&mut subscriptions, id) }; if !removed { removed = { let mut subscriptions = self .subscriptions .gossip_logs_subscriptions .write() .unwrap(); remove_subscription(&mut subscriptions, id) }; } if removed { self.update_bank_transaction_log_keys(); } removed } fn update_bank_transaction_log_keys(&self) { // Grab a write lock for both `logs_subscriptions` and `gossip_logs_subscriptions`, to // ensure `Bank::transaction_log_collector_config` is updated atomically. let logs_subscriptions = self.subscriptions.logs_subscriptions.write().unwrap(); let gossip_logs_subscriptions = self .subscriptions .gossip_logs_subscriptions .write() .unwrap(); let mut config = TransactionLogCollectorConfig::default(); let mut all = false; let mut all_with_votes = false; let mut mentioned_address = false; for (address, with_votes) in logs_subscriptions .keys() .chain(gossip_logs_subscriptions.keys()) { match address { None => { if *with_votes { all_with_votes = true; } else { all = true; } } Some(address) => { config.mentioned_addresses.insert(*address); mentioned_address = true; } } } config.filter = if all_with_votes { TransactionLogCollectorFilter::AllWithVotes } else if all { TransactionLogCollectorFilter::All } else if mentioned_address { TransactionLogCollectorFilter::OnlyMentionedAddresses } else { TransactionLogCollectorFilter::None }; *self .bank_forks .read() .unwrap() .root_bank() .transaction_log_collector_config .write() .unwrap() = config; } pub fn add_signature_subscription( &self, signature: Signature, signature_subscribe_config: Option, sub_id: SubscriptionId, subscriber: Subscriber>, ) { let (commitment, enable_received_notification) = signature_subscribe_config .map(|config| (config.commitment, config.enable_received_notification)) .unwrap_or_default(); let commitment = commitment.unwrap_or_else(CommitmentConfig::confirmed); let mut subscriptions = if commitment.is_confirmed() { self.subscriptions .gossip_signature_subscriptions .write() .unwrap() } else { self.subscriptions.signature_subscriptions.write().unwrap() }; add_subscription( &mut subscriptions, signature, commitment, sub_id, subscriber, 0, // last_notified_slot is not utilized for signature subscriptions enable_received_notification, ); } pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.subscriptions.signature_subscriptions.write().unwrap(); if remove_subscription(&mut subscriptions, id) { true } else { let mut subscriptions = self .subscriptions .gossip_signature_subscriptions .write() .unwrap(); remove_subscription(&mut subscriptions, id) } } /// Notify subscribers of changes to any accounts or new signatures since /// the bank's last checkpoint. pub fn notify_subscribers(&self, commitment_slots: CommitmentSlots) { self.enqueue_notification(NotificationEntry::Bank(commitment_slots)); } /// Notify Confirmed commitment-level subscribers of changes to any accounts or new /// signatures. pub fn notify_gossip_subscribers(&self, slot: Slot) { self.enqueue_notification(NotificationEntry::Gossip(slot)); } pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let mut subscriptions = self.subscriptions.slot_subscriptions.write().unwrap(); subscriptions.insert(sub_id, sink); } pub fn remove_slot_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.subscriptions.slot_subscriptions.write().unwrap(); subscriptions.remove(id).is_some() } pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) { self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root })); } pub fn notify_signatures_received(&self, slot_signatures: (Slot, Vec)) { self.enqueue_notification(NotificationEntry::SignaturesReceived(slot_signatures)); } pub fn add_vote_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { if self.enable_vote_subscription { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap(); subscriptions.insert(sub_id, sink); } else { let _ = subscriber.reject(jsonrpc_core::Error::new( jsonrpc_core::ErrorCode::MethodNotFound, )); } } pub fn remove_vote_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap(); subscriptions.remove(id).is_some() } pub fn notify_vote(&self, vote: &Vote) { self.enqueue_notification(NotificationEntry::Vote(vote.clone())); } pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap(); subscriptions.insert(sub_id, sink); } pub fn remove_root_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap(); subscriptions.remove(id).is_some() } pub fn notify_roots(&self, mut rooted_slots: Vec) { rooted_slots.sort_unstable(); rooted_slots.into_iter().for_each(|root| { self.enqueue_notification(NotificationEntry::Root(root)); }); } fn enqueue_notification(&self, notification_entry: NotificationEntry) { match self .notification_sender .lock() .unwrap() .send(notification_entry) { Ok(()) => (), Err(SendError(notification)) => { warn!( "Dropped RPC Notification - receiver disconnected : {:?}", notification ); } } } fn process_notifications( exit: Arc, notifier: RpcNotifier, notification_receiver: Receiver, subscriptions: Subscriptions, bank_forks: Arc>, ) { loop { if exit.load(Ordering::Relaxed) { break; } match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) { Ok(notification_entry) => match notification_entry { NotificationEntry::Slot(slot_info) => { let subscriptions = subscriptions.slot_subscriptions.read().unwrap(); let num_subscriptions = subscriptions.len(); if num_subscriptions > 0 { debug!( "slot notify: {:?}, num_subscriptions: {:?}", slot_info, num_subscriptions ); } for (_, sink) in subscriptions.iter() { inc_new_counter_info!("rpc-subscription-notify-slot", 1); notifier.notify(slot_info, sink); } } // These notifications are only triggered by votes observed on gossip, // unlike `NotificationEntry::Gossip`, which also accounts for slots seen // in VoteState's from bank states built in ReplayStage. NotificationEntry::Vote(ref vote_info) => { let subscriptions = subscriptions.vote_subscriptions.read().unwrap(); let num_subscriptions = subscriptions.len(); if num_subscriptions > 0 { debug!( "vote notify: {:?}, num_subscriptions: {:?}", vote_info, num_subscriptions ); } for (_, sink) in subscriptions.iter() { inc_new_counter_info!("rpc-subscription-notify-vote", 1); notifier.notify( RpcVote { slots: vote_info.slots.clone(), hash: bs58::encode(vote_info.hash).into_string(), timestamp: vote_info.timestamp, }, sink, ); } } NotificationEntry::Root(root) => { let subscriptions = subscriptions.root_subscriptions.read().unwrap(); let num_subscriptions = subscriptions.len(); if num_subscriptions > 0 { debug!( "root notify: {:?}, num_subscriptions: {:?}", root, num_subscriptions ); } for (_, sink) in subscriptions.iter() { inc_new_counter_info!("rpc-subscription-notify-root", 1); notifier.notify(root, sink); } } NotificationEntry::Bank(commitment_slots) => { RpcSubscriptions::notify_accounts_logs_programs_signatures( &subscriptions.account_subscriptions, &subscriptions.logs_subscriptions, &subscriptions.program_subscriptions, &subscriptions.signature_subscriptions, &bank_forks, &commitment_slots, ¬ifier, "bank", ) } NotificationEntry::Gossip(slot) => { Self::process_gossip_notification( slot, ¬ifier, &subscriptions, &bank_forks, ); } NotificationEntry::SignaturesReceived(slot_signatures) => { RpcSubscriptions::process_signatures_received( &slot_signatures, &subscriptions.signature_subscriptions, ¬ifier, ) } }, Err(RecvTimeoutError::Timeout) => { // not a problem - try reading again } Err(RecvTimeoutError::Disconnected) => { warn!("RPC Notification thread - sender disconnected"); break; } } } } fn process_gossip_notification( slot: Slot, notifier: &RpcNotifier, subscriptions: &Subscriptions, bank_forks: &Arc>, ) { let commitment_slots = CommitmentSlots { highest_confirmed_slot: slot, ..CommitmentSlots::default() }; RpcSubscriptions::notify_accounts_logs_programs_signatures( &subscriptions.gossip_account_subscriptions, &subscriptions.gossip_logs_subscriptions, &subscriptions.gossip_program_subscriptions, &subscriptions.gossip_signature_subscriptions, bank_forks, &commitment_slots, ¬ifier, "gossip", ); } fn notify_accounts_logs_programs_signatures( account_subscriptions: &Arc, logs_subscriptions: &Arc, program_subscriptions: &Arc, signature_subscriptions: &Arc, bank_forks: &Arc>, commitment_slots: &CommitmentSlots, notifier: &RpcNotifier, source: &'static str, ) { let mut accounts_time = Measure::start("accounts"); let pubkeys: Vec<_> = { let subs = account_subscriptions.read().unwrap(); subs.keys().cloned().collect() }; let mut num_pubkeys_notified = 0; for pubkey in &pubkeys { num_pubkeys_notified += Self::check_account( pubkey, bank_forks, account_subscriptions.clone(), ¬ifier, &commitment_slots, ) .len(); } accounts_time.stop(); let mut logs_time = Measure::start("logs"); let logs: Vec<_> = { let subs = logs_subscriptions.read().unwrap(); subs.keys().cloned().collect() }; let mut num_logs_notified = 0; for address in &logs { num_logs_notified += Self::check_logs( address, bank_forks, logs_subscriptions.clone(), ¬ifier, &commitment_slots, ) .len(); } logs_time.stop(); let mut programs_time = Measure::start("programs"); let programs: Vec<_> = { let subs = program_subscriptions.read().unwrap(); subs.keys().cloned().collect() }; let mut num_programs_notified = 0; for program_id in &programs { num_programs_notified += Self::check_program( program_id, bank_forks, program_subscriptions.clone(), ¬ifier, &commitment_slots, ) .len(); } programs_time.stop(); let mut signatures_time = Measure::start("signatures"); let signatures: Vec<_> = { let subs = signature_subscriptions.read().unwrap(); subs.keys().cloned().collect() }; let mut num_signatures_notified = 0; for signature in &signatures { num_signatures_notified += Self::check_signature( signature, bank_forks, signature_subscriptions.clone(), ¬ifier, &commitment_slots, ) .len(); } signatures_time.stop(); let total_notified = num_pubkeys_notified + num_programs_notified + num_signatures_notified; let total_ms = accounts_time.as_ms() + programs_time.as_ms() + signatures_time.as_ms(); if total_notified > 0 || total_ms > 10 { debug!( "notified({}): accounts: {} / {} ({}) programs: {} / {} ({}) signatures: {} / {} ({})", source, pubkeys.len(), num_pubkeys_notified, accounts_time, programs.len(), num_programs_notified, programs_time, signatures.len(), num_signatures_notified, signatures_time, ); inc_new_counter_info!("rpc-subscription-notify-bank-or-gossip", total_notified); datapoint_info!( "rpc_subscriptions", ("source", source.to_string(), String), ("num_account_subscriptions", pubkeys.len(), i64), ("num_account_pubkeys_notified", num_pubkeys_notified, i64), ("accounts_time", accounts_time.as_us() as i64, i64), ("num_logs_subscriptions", logs.len(), i64), ("num_logs_notified", num_logs_notified, i64), ("logs_time", logs_time.as_us() as i64, i64), ("num_program_subscriptions", programs.len(), i64), ("num_programs_notified", num_programs_notified, i64), ("programs_time", programs_time.as_us() as i64, i64), ("num_signature_subscriptions", signatures.len(), i64), ("num_signatures_notified", num_signatures_notified, i64), ("signatures_time", signatures_time.as_us() as i64, i64) ); } } fn process_signatures_received( (received_slot, signatures): &(Slot, Vec), signature_subscriptions: &Arc, notifier: &RpcNotifier, ) { for signature in signatures { if let Some(hashmap) = signature_subscriptions.read().unwrap().get(signature) { for ( _, SubscriptionData { sink, config: is_received_notification_enabled, .. }, ) in hashmap.iter() { if is_received_notification_enabled.unwrap_or_default() { notifier.notify( Response { context: RpcResponseContext { slot: *received_slot, }, value: RpcSignatureResult::ReceivedSignature( ReceivedSignatureResult::ReceivedSignature, ), }, &sink, ); } } } } } fn shutdown(&mut self) -> std::thread::Result<()> { if self.t_cleanup.is_some() { info!("RPC Notification thread - shutting down"); self.exit.store(true, Ordering::Relaxed); let x = self.t_cleanup.take().unwrap().join(); info!("RPC Notification thread - shut down."); x } else { warn!("RPC Notification thread - already shut down."); Ok(()) } } } #[cfg(test)] pub(crate) mod tests { use super::*; use crate::optimistically_confirmed_bank_tracker::{ BankNotification, OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker, }; use jsonrpc_core::futures::StreamExt; use jsonrpc_pubsub::typed::Subscriber; use serial_test::serial; use solana_runtime::{ commitment::BlockCommitment, genesis_utils::{create_genesis_config, GenesisConfigInfo}, }; use solana_sdk::{ message::Message, signature::{Keypair, Signer}, system_instruction, system_program, system_transaction, transaction::Transaction, }; use std::{fmt::Debug, sync::mpsc::channel}; use tokio::{ runtime::Runtime, time::{sleep, timeout}, }; pub(crate) fn robust_poll_or_panic( receiver: jsonrpc_core::futures::channel::mpsc::UnboundedReceiver, ) -> ( T, jsonrpc_core::futures::channel::mpsc::UnboundedReceiver, ) { let (inner_sender, inner_receiver) = channel(); let rt = Runtime::new().unwrap(); rt.spawn(async move { let result = timeout( Duration::from_millis(RECEIVE_DELAY_MILLIS), receiver.into_future(), ) .await .unwrap_or_else(|err| panic!("stream error {:?}", err)); match result { (Some(value), receiver) => { inner_sender.send((value, receiver)).expect("send error") } (None, _) => panic!("unexpected end of stream"), } sleep(Duration::from_millis(RECEIVE_DELAY_MILLIS * 2)).await; }); inner_receiver.recv().expect("recv error") } #[test] #[serial] fn test_check_account_subscribe() { let GenesisConfigInfo { genesis_config, mint_keypair, .. } = create_genesis_config(100); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); let alice = Keypair::new(); let (create_sub, _id_receiver, create_recv) = Subscriber::new_test("accountNotification"); let (close_sub, _id_receiver, close_recv) = Subscriber::new_test("accountNotification"); let create_sub_id = SubscriptionId::Number(0); let close_sub_id = SubscriptionId::Number(1); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new( &exit, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, ))), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), ); subscriptions.add_account_subscription( alice.pubkey(), Some(RpcAccountInfoConfig { commitment: Some(CommitmentConfig::processed()), encoding: None, data_slice: None, }), create_sub_id.clone(), create_sub, ); assert!(subscriptions .subscriptions .account_subscriptions .read() .unwrap() .contains_key(&alice.pubkey())); let tx = system_transaction::create_account( &mint_keypair, &alice, blockhash, 1, 0, &system_program::id(), ); bank_forks .write() .unwrap() .get(1) .unwrap() .process_transaction(&tx) .unwrap(); let commitment_slots = CommitmentSlots { slot: 1, ..CommitmentSlots::default() }; subscriptions.notify_subscribers(commitment_slots); let (response, _) = robust_poll_or_panic(create_recv); let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", "params": { "result": { "context": { "slot": 1 }, "value": { "data": "", "executable": false, "lamports": 1, "owner": "11111111111111111111111111111111", "rentEpoch": 0, }, }, "subscription": 0, } }); assert_eq!(serde_json::to_string(&expected).unwrap(), response); subscriptions.remove_account_subscription(&create_sub_id); subscriptions.add_account_subscription( alice.pubkey(), Some(RpcAccountInfoConfig { commitment: Some(CommitmentConfig::processed()), encoding: None, data_slice: None, }), close_sub_id.clone(), close_sub, ); let tx = { let instruction = system_instruction::transfer(&alice.pubkey(), &mint_keypair.pubkey(), 1); let message = Message::new(&[instruction], Some(&mint_keypair.pubkey())); Transaction::new(&[&alice, &mint_keypair], message, blockhash) }; bank_forks .write() .unwrap() .get(1) .unwrap() .process_transaction(&tx) .unwrap(); subscriptions.notify_subscribers(commitment_slots); let (response, _) = robust_poll_or_panic(close_recv); let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", "params": { "result": { "context": { "slot": 1 }, "value": { "data": "", "executable": false, "lamports": 0, "owner": "11111111111111111111111111111111", "rentEpoch": 0, }, }, "subscription": 1, } }); assert_eq!(serde_json::to_string(&expected).unwrap(), response); subscriptions.remove_account_subscription(&close_sub_id); assert!(!subscriptions .subscriptions .account_subscriptions .read() .unwrap() .contains_key(&alice.pubkey())); } #[test] #[serial] fn test_check_program_subscribe() { let GenesisConfigInfo { genesis_config, mint_keypair, .. } = create_genesis_config(100); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let alice = Keypair::new(); let tx = system_transaction::create_account( &mint_keypair, &alice, blockhash, 1, 16, &solana_stake_program::id(), ); bank_forks .write() .unwrap() .get(0) .unwrap() .process_transaction(&tx) .unwrap(); let (subscriber, _id_receiver, transport_receiver) = Subscriber::new_test("programNotification"); let sub_id = SubscriptionId::Number(0); let exit = Arc::new(AtomicBool::new(false)); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = RpcSubscriptions::new( &exit, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), optimistically_confirmed_bank, ); subscriptions.add_program_subscription( solana_stake_program::id(), Some(RpcProgramAccountsConfig { account_config: RpcAccountInfoConfig { commitment: Some(CommitmentConfig::processed()), ..RpcAccountInfoConfig::default() }, ..RpcProgramAccountsConfig::default() }), sub_id.clone(), subscriber, ); assert!(subscriptions .subscriptions .program_subscriptions .read() .unwrap() .contains_key(&solana_stake_program::id())); subscriptions.notify_subscribers(CommitmentSlots::default()); let (response, _) = robust_poll_or_panic(transport_receiver); let expected = json!({ "jsonrpc": "2.0", "method": "programNotification", "params": { "result": { "context": { "slot": 0 }, "value": { "account": { "data": "1111111111111111", "executable": false, "lamports": 1, "owner": "Stake11111111111111111111111111111111111111", "rentEpoch": 0, }, "pubkey": alice.pubkey().to_string(), }, }, "subscription": 0, } }); assert_eq!(serde_json::to_string(&expected).unwrap(), response); subscriptions.remove_program_subscription(&sub_id); assert!(!subscriptions .subscriptions .program_subscriptions .read() .unwrap() .contains_key(&solana_stake_program::id())); } #[test] #[serial] fn test_check_signature_subscribe() { let GenesisConfigInfo { genesis_config, mint_keypair, .. } = create_genesis_config(100); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let mut bank_forks = BankForks::new(bank); let alice = Keypair::new(); let past_bank_tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, blockhash); let unprocessed_tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, blockhash); let processed_tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 3, blockhash); bank_forks .get(0) .unwrap() .process_transaction(&past_bank_tx) .unwrap(); let next_bank = Bank::new_from_parent( &bank_forks.banks[&0].clone(), &solana_sdk::pubkey::new_rand(), 1, ); bank_forks.insert(next_bank); bank_forks .get(1) .unwrap() .process_transaction(&processed_tx) .unwrap(); let bank1 = bank_forks[1].clone(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let mut cache0 = BlockCommitment::default(); cache0.increase_confirmation_stake(1, 10); let cache1 = BlockCommitment::default(); let mut block_commitment = HashMap::new(); block_commitment.entry(0).or_insert(cache0); block_commitment.entry(1).or_insert(cache1); let block_commitment_cache = BlockCommitmentCache::new( block_commitment, 10, CommitmentSlots { slot: bank1.slot(), ..CommitmentSlots::default() }, ); let exit = Arc::new(AtomicBool::new(false)); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = RpcSubscriptions::new( &exit, bank_forks, Arc::new(RwLock::new(block_commitment_cache)), optimistically_confirmed_bank, ); let (past_bank_sub1, _id_receiver, past_bank_recv1) = Subscriber::new_test("signatureNotification"); let (past_bank_sub2, _id_receiver, past_bank_recv2) = Subscriber::new_test("signatureNotification"); let (processed_sub, _id_receiver, processed_recv) = Subscriber::new_test("signatureNotification"); let (processed_sub3, _id_receiver, processed_recv3) = Subscriber::new_test("signatureNotification"); subscriptions.add_signature_subscription( past_bank_tx.signatures[0], Some(RpcSignatureSubscribeConfig { commitment: Some(CommitmentConfig::processed()), enable_received_notification: Some(false), }), SubscriptionId::Number(1), past_bank_sub1, ); subscriptions.add_signature_subscription( past_bank_tx.signatures[0], Some(RpcSignatureSubscribeConfig { commitment: Some(CommitmentConfig::finalized()), enable_received_notification: Some(false), }), SubscriptionId::Number(2), past_bank_sub2, ); subscriptions.add_signature_subscription( processed_tx.signatures[0], Some(RpcSignatureSubscribeConfig { commitment: Some(CommitmentConfig::processed()), enable_received_notification: Some(false), }), SubscriptionId::Number(3), processed_sub, ); subscriptions.add_signature_subscription( unprocessed_tx.signatures[0], Some(RpcSignatureSubscribeConfig { commitment: Some(CommitmentConfig::processed()), enable_received_notification: Some(false), }), SubscriptionId::Number(4), Subscriber::new_test("signatureNotification").0, ); // Add a subscription that gets `received` notifications subscriptions.add_signature_subscription( unprocessed_tx.signatures[0], Some(RpcSignatureSubscribeConfig { commitment: Some(CommitmentConfig::processed()), enable_received_notification: Some(true), }), SubscriptionId::Number(5), processed_sub3, ); { let sig_subs = subscriptions .subscriptions .signature_subscriptions .read() .unwrap(); assert_eq!(sig_subs.get(&past_bank_tx.signatures[0]).unwrap().len(), 2); assert!(sig_subs.contains_key(&unprocessed_tx.signatures[0])); assert!(sig_subs.contains_key(&processed_tx.signatures[0])); } let mut commitment_slots = CommitmentSlots::default(); let received_slot = 1; commitment_slots.slot = received_slot; subscriptions .notify_signatures_received((received_slot, vec![unprocessed_tx.signatures[0]])); subscriptions.notify_subscribers(commitment_slots); let expected_res = RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: None }); let received_expected_res = RpcSignatureResult::ReceivedSignature(ReceivedSignatureResult::ReceivedSignature); struct Notification { slot: Slot, id: u64, } let expected_notification = |exp: Notification, expected_res: &RpcSignatureResult| -> String { let json = json!({ "jsonrpc": "2.0", "method": "signatureNotification", "params": { "result": { "context": { "slot": exp.slot }, "value": expected_res, }, "subscription": exp.id, } }); serde_json::to_string(&json).unwrap() }; // Expect to receive a notification from bank 1 because this subscription is // looking for 0 confirmations and so checks the current bank let expected = expected_notification(Notification { slot: 1, id: 1 }, &expected_res); let (response, _) = robust_poll_or_panic(past_bank_recv1); assert_eq!(expected, response); // Expect to receive a notification from bank 0 because this subscription is // looking for 1 confirmation and so checks the past bank let expected = expected_notification(Notification { slot: 0, id: 2 }, &expected_res); let (response, _) = robust_poll_or_panic(past_bank_recv2); assert_eq!(expected, response); let expected = expected_notification(Notification { slot: 1, id: 3 }, &expected_res); let (response, _) = robust_poll_or_panic(processed_recv); assert_eq!(expected, response); // Expect a "received" notification let expected = expected_notification( Notification { slot: received_slot, id: 5, }, &received_expected_res, ); let (response, _) = robust_poll_or_panic(processed_recv3); assert_eq!(expected, response); // Subscription should be automatically removed after notification let sig_subs = subscriptions .subscriptions .signature_subscriptions .read() .unwrap(); assert!(!sig_subs.contains_key(&processed_tx.signatures[0])); assert!(!sig_subs.contains_key(&past_bank_tx.signatures[0])); // Unprocessed signature subscription should not be removed assert_eq!( sig_subs.get(&unprocessed_tx.signatures[0]).unwrap().len(), 2 ); } #[test] #[serial] fn test_check_slot_subscribe() { let (subscriber, _id_receiver, transport_receiver) = Subscriber::new_test("slotNotification"); let sub_id = SubscriptionId::Number(0); let exit = Arc::new(AtomicBool::new(false)); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = RpcSubscriptions::new( &exit, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), optimistically_confirmed_bank, ); subscriptions.add_slot_subscription(sub_id.clone(), subscriber); assert!(subscriptions .subscriptions .slot_subscriptions .read() .unwrap() .contains_key(&sub_id)); subscriptions.notify_slot(0, 0, 0); let (response, _) = robust_poll_or_panic(transport_receiver); let expected_res = SlotInfo { parent: 0, slot: 0, root: 0, }; let expected_res_str = serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); let expected = format!( r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str ); assert_eq!(expected, response); subscriptions.remove_slot_subscription(&sub_id); assert!(!subscriptions .subscriptions .slot_subscriptions .read() .unwrap() .contains_key(&sub_id)); } #[test] #[serial] fn test_check_root_subscribe() { let (subscriber, _id_receiver, mut transport_receiver) = Subscriber::new_test("rootNotification"); let sub_id = SubscriptionId::Number(0); let exit = Arc::new(AtomicBool::new(false)); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = RpcSubscriptions::new( &exit, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), optimistically_confirmed_bank, ); subscriptions.add_root_subscription(sub_id.clone(), subscriber); assert!(subscriptions .subscriptions .root_subscriptions .read() .unwrap() .contains_key(&sub_id)); subscriptions.notify_roots(vec![2, 1, 3]); for expected_root in 1..=3 { let (response, receiver) = robust_poll_or_panic(transport_receiver); transport_receiver = receiver; let expected_res_str = serde_json::to_string(&serde_json::to_value(expected_root).unwrap()).unwrap(); let expected = format!( r#"{{"jsonrpc":"2.0","method":"rootNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str ); assert_eq!(expected, response); } subscriptions.remove_root_subscription(&sub_id); assert!(!subscriptions .subscriptions .root_subscriptions .read() .unwrap() .contains_key(&sub_id)); } #[test] #[serial] fn test_add_and_remove_subscription() { let mut subscriptions: HashMap>> = HashMap::new(); let commitment = CommitmentConfig::confirmed(); let num_keys = 5; for key in 0..num_keys { let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("notification"); let sub_id = SubscriptionId::Number(key); add_subscription( &mut subscriptions, key, commitment, sub_id, subscriber, 0, None, ); } // Add another subscription to the "0" key let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("notification"); let extra_sub_id = SubscriptionId::Number(num_keys); add_subscription( &mut subscriptions, 0, commitment, extra_sub_id.clone(), subscriber, 0, None, ); assert_eq!(subscriptions.len(), num_keys as usize); assert_eq!(subscriptions.get(&0).unwrap().len(), 2); assert_eq!(subscriptions.get(&1).unwrap().len(), 1); assert_eq!( remove_subscription(&mut subscriptions, &SubscriptionId::Number(0)), true ); assert_eq!(subscriptions.len(), num_keys as usize); assert_eq!(subscriptions.get(&0).unwrap().len(), 1); assert_eq!( remove_subscription(&mut subscriptions, &SubscriptionId::Number(0)), false ); assert_eq!(remove_subscription(&mut subscriptions, &extra_sub_id), true); assert_eq!(subscriptions.len(), (num_keys - 1) as usize); assert!(subscriptions.get(&0).is_none()); } #[test] #[serial] fn test_gossip_separate_account_notifications() { let GenesisConfigInfo { genesis_config, mint_keypair, .. } = create_genesis_config(100); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); let bank2 = Bank::new_from_parent(&bank0, &Pubkey::default(), 2); bank_forks.write().unwrap().insert(bank2); let alice = Keypair::new(); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let mut pending_optimistically_confirmed_banks = HashSet::new(); let (subscriber0, _id_receiver, transport_receiver0) = Subscriber::new_test("accountNotification"); let (subscriber1, _id_receiver, transport_receiver1) = Subscriber::new_test("accountNotification"); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = Arc::new(RpcSubscriptions::new( &exit, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, ))), optimistically_confirmed_bank.clone(), )); let sub_id0 = SubscriptionId::Number(0); subscriptions.add_account_subscription( alice.pubkey(), Some(RpcAccountInfoConfig { commitment: Some(CommitmentConfig::confirmed()), encoding: None, data_slice: None, }), sub_id0.clone(), subscriber0, ); assert!(subscriptions .subscriptions .gossip_account_subscriptions .read() .unwrap() .contains_key(&alice.pubkey())); let tx = system_transaction::create_account( &mint_keypair, &alice, blockhash, 1, 16, &solana_stake_program::id(), ); // Add the transaction to the 1st bank and then freeze the bank let bank1 = bank_forks.write().unwrap().get(1).cloned().unwrap(); bank1.process_transaction(&tx).unwrap(); bank1.freeze(); // Add the same transaction to the unfrozen 2nd bank bank_forks .write() .unwrap() .get(2) .unwrap() .process_transaction(&tx) .unwrap(); // First, notify the unfrozen bank first to queue pending notification OptimisticallyConfirmedBankTracker::process_notification( BankNotification::OptimisticallyConfirmed(2), &bank_forks, &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, ); // Now, notify the frozen bank and ensure its notifications are processed OptimisticallyConfirmedBankTracker::process_notification( BankNotification::OptimisticallyConfirmed(1), &bank_forks, &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, ); let (response, _) = robust_poll_or_panic(transport_receiver0); let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", "params": { "result": { "context": { "slot": 1 }, "value": { "data": "1111111111111111", "executable": false, "lamports": 1, "owner": "Stake11111111111111111111111111111111111111", "rentEpoch": 0, }, }, "subscription": 0, } }); assert_eq!(serde_json::to_string(&expected).unwrap(), response); subscriptions.remove_account_subscription(&sub_id0); let sub_id1 = SubscriptionId::Number(1); subscriptions.add_account_subscription( alice.pubkey(), Some(RpcAccountInfoConfig { commitment: Some(CommitmentConfig::confirmed()), encoding: None, data_slice: None, }), sub_id1.clone(), subscriber1, ); let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone(); OptimisticallyConfirmedBankTracker::process_notification( BankNotification::Frozen(bank2), &bank_forks, &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, ); let (response, _) = robust_poll_or_panic(transport_receiver1); let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", "params": { "result": { "context": { "slot": 2 }, "value": { "data": "1111111111111111", "executable": false, "lamports": 1, "owner": "Stake11111111111111111111111111111111111111", "rentEpoch": 0, }, }, "subscription": 1, } }); assert_eq!(serde_json::to_string(&expected).unwrap(), response); subscriptions.remove_account_subscription(&sub_id1); assert!(!subscriptions .subscriptions .gossip_account_subscriptions .read() .unwrap() .contains_key(&alice.pubkey())); } #[test] fn test_total_nested_subscriptions() { let mock_subscriptions = RwLock::new(HashMap::new()); assert_eq!(total_nested_subscriptions(&mock_subscriptions), 0); mock_subscriptions .write() .unwrap() .insert(0, HashMap::new()); assert_eq!(total_nested_subscriptions(&mock_subscriptions), 0); mock_subscriptions .write() .unwrap() .entry(0) .and_modify(|map| { map.insert(0, "test"); }); assert_eq!(total_nested_subscriptions(&mock_subscriptions), 1); mock_subscriptions .write() .unwrap() .entry(0) .and_modify(|map| { map.insert(1, "test"); }); assert_eq!(total_nested_subscriptions(&mock_subscriptions), 2); mock_subscriptions .write() .unwrap() .insert(1, HashMap::new()); assert_eq!(total_nested_subscriptions(&mock_subscriptions), 2); mock_subscriptions .write() .unwrap() .entry(1) .and_modify(|map| { map.insert(0, "test"); }); assert_eq!(total_nested_subscriptions(&mock_subscriptions), 3); } #[test] fn test_total_subscriptions() { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100); let bank = Bank::new(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let subscriptions = RpcSubscriptions::default_with_bank_forks(bank_forks); let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("accountNotification"); let account_sub_id = SubscriptionId::Number(0u64); subscriptions.add_account_subscription( Pubkey::default(), None, account_sub_id.clone(), subscriber, ); assert_eq!(subscriptions.total(), 1); let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("programNotification"); let program_sub_id = SubscriptionId::Number(1u64); subscriptions.add_program_subscription( Pubkey::default(), None, program_sub_id.clone(), subscriber, ); assert_eq!(subscriptions.total(), 2); let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("logsNotification"); let logs_sub_id = SubscriptionId::Number(2u64); subscriptions.add_logs_subscription(None, false, None, logs_sub_id.clone(), subscriber); assert_eq!(subscriptions.total(), 3); let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("signatureNotification"); let sig_sub_id = SubscriptionId::Number(3u64); subscriptions.add_signature_subscription( Signature::default(), None, sig_sub_id.clone(), subscriber, ); assert_eq!(subscriptions.total(), 4); let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("slotNotification"); let slot_sub_id = SubscriptionId::Number(4u64); subscriptions.add_slot_subscription(slot_sub_id.clone(), subscriber); assert_eq!(subscriptions.total(), 5); let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("voteNotification"); let vote_sub_id = SubscriptionId::Number(5u64); subscriptions.add_vote_subscription(vote_sub_id.clone(), subscriber); assert_eq!(subscriptions.total(), 6); let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("rootNotification"); let root_sub_id = SubscriptionId::Number(6u64); subscriptions.add_root_subscription(root_sub_id.clone(), subscriber); assert_eq!(subscriptions.total(), 7); // Add duplicate account subscription to ensure totals include all subscriptions on all keys let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("accountNotification2"); let account_dupe_sub_id = SubscriptionId::Number(7u64); subscriptions.add_account_subscription( Pubkey::default(), None, account_dupe_sub_id.clone(), subscriber, ); assert_eq!(subscriptions.total(), 8); subscriptions.remove_account_subscription(&account_sub_id); assert_eq!(subscriptions.total(), 7); subscriptions.remove_account_subscription(&account_dupe_sub_id); assert_eq!(subscriptions.total(), 6); subscriptions.remove_program_subscription(&program_sub_id); assert_eq!(subscriptions.total(), 5); subscriptions.remove_logs_subscription(&logs_sub_id); assert_eq!(subscriptions.total(), 4); subscriptions.remove_signature_subscription(&sig_sub_id); assert_eq!(subscriptions.total(), 3); subscriptions.remove_slot_subscription(&slot_sub_id); assert_eq!(subscriptions.total(), 2); subscriptions.remove_vote_subscription(&vote_sub_id); assert_eq!(subscriptions.total(), 1); subscriptions.remove_root_subscription(&root_sub_id); assert_eq!(subscriptions.total(), 0); } }