diff --git a/core/src/commitment.rs b/core/src/commitment.rs index ad4062d78..427a7b64d 100644 --- a/core/src/commitment.rs +++ b/core/src/commitment.rs @@ -162,6 +162,24 @@ impl BlockCommitmentCache { } } + #[cfg(test)] + pub fn new_for_tests_with_blockstore_bank( + blockstore: Arc, + bank: Arc, + root: Slot, + ) -> Self { + let mut block_commitment: HashMap = HashMap::new(); + block_commitment.insert(0, BlockCommitment::default()); + Self { + block_commitment, + blockstore, + total_stake: 42, + largest_confirmed_root: root, + bank, + root, + } + } + #[cfg(test)] pub(crate) fn set_get_largest_confirmed_root(&mut self, root: Slot) { self.largest_confirmed_root = root; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 08ad1615f..38b150ee9 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -308,10 +308,8 @@ impl ReplayStage { // Vote on a fork if let Some(ref vote_bank) = vote_bank { - subscriptions.notify_subscribers( - block_commitment_cache.read().unwrap().slot(), - &bank_forks, - ); + subscriptions + .notify_subscribers(block_commitment_cache.read().unwrap().slot()); if let Some(votable_leader) = leader_schedule_cache.slot_leader_at(vote_bank.slot(), Some(vote_bank)) { @@ -2060,12 +2058,6 @@ pub(crate) mod tests { ); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = Arc::new(RpcSubscriptions::new( - &exit, - Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore( - blockstore.clone(), - ))), - )); let mut bank_forks = BankForks::new(0, bank0); // Insert a non-root bank so that the propagation logic will update this @@ -2089,7 +2081,14 @@ pub(crate) mod tests { assert!(progress.get_propagated_stats(1).unwrap().is_leader_slot); bank1.freeze(); bank_forks.insert(bank1); - let bank_forks = RwLock::new(bank_forks); + let bank_forks = Arc::new(RwLock::new(bank_forks)); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + bank_forks.clone(), + Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore( + blockstore.clone(), + ))), + )); // Insert shreds for slot NUM_CONSECUTIVE_LEADER_SLOTS, // chaining to slot 1 diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index cb9d5d465..2f3f0503e 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -1,6 +1,6 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::rpc_subscriptions::{Confirmations, RpcSubscriptions, SlotInfo}; +use crate::rpc_subscriptions::{RpcSubscriptions, SlotInfo}; use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; @@ -8,8 +8,12 @@ use solana_client::rpc_response::{ Response as RpcResponse, RpcAccount, RpcKeyedAccount, RpcSignatureResult, }; #[cfg(test)] -use solana_ledger::blockstore::Blockstore; -use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; +use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore}; +use solana_sdk::{ + clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature, +}; +#[cfg(test)] +use std::sync::RwLock; use std::{ str::FromStr, sync::{atomic, Arc}, @@ -35,7 +39,7 @@ pub trait RpcSolPubSub { meta: Self::Metadata, subscriber: Subscriber>, pubkey_str: String, - confirmations: Option, + commitment: Option, ); // Unsubscribe from account notification subscription. @@ -59,7 +63,7 @@ pub trait RpcSolPubSub { meta: Self::Metadata, subscriber: Subscriber>, pubkey_str: String, - confirmations: Option, + commitment: Option, ); // Unsubscribe from account notification subscription. @@ -83,7 +87,7 @@ pub trait RpcSolPubSub { meta: Self::Metadata, subscriber: Subscriber>, signature_str: String, - confirmations: Option, + commitment: Option, ); // Unsubscribe from signature notification subscription. @@ -135,9 +139,14 @@ impl RpcSolPubSubImpl { } #[cfg(test)] - fn default_with_blockstore(blockstore: Arc) -> Self { + fn default_with_blockstore_bank_forks( + blockstore: Arc, + bank_forks: Arc>, + ) -> Self { let uid = Arc::new(atomic::AtomicUsize::default()); - let subscriptions = Arc::new(RpcSubscriptions::default_with_blockstore(blockstore)); + let subscriptions = Arc::new(RpcSubscriptions::default_with_blockstore_bank_forks( + blockstore, bank_forks, + )); Self { uid, subscriptions } } } @@ -158,19 +167,15 @@ impl RpcSolPubSub for RpcSolPubSubImpl { _meta: Self::Metadata, subscriber: Subscriber>, pubkey_str: String, - confirmations: Option, + commitment: Option, ) { match param::(&pubkey_str, "pubkey") { Ok(pubkey) => { let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let sub_id = SubscriptionId::Number(id as u64); info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id); - self.subscriptions.add_account_subscription( - pubkey, - confirmations, - sub_id, - subscriber, - ) + self.subscriptions + .add_account_subscription(pubkey, commitment, sub_id, subscriber) } Err(e) => subscriber.reject(e).unwrap(), } @@ -198,19 +203,15 @@ impl RpcSolPubSub for RpcSolPubSubImpl { _meta: Self::Metadata, subscriber: Subscriber>, pubkey_str: String, - confirmations: Option, + commitment: Option, ) { match param::(&pubkey_str, "pubkey") { Ok(pubkey) => { let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let sub_id = SubscriptionId::Number(id as u64); info!("program_subscribe: account={:?} id={:?}", pubkey, sub_id); - self.subscriptions.add_program_subscription( - pubkey, - confirmations, - sub_id, - subscriber, - ) + self.subscriptions + .add_program_subscription(pubkey, commitment, sub_id, subscriber) } Err(e) => subscriber.reject(e).unwrap(), } @@ -238,7 +239,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { _meta: Self::Metadata, subscriber: Subscriber>, signature_str: String, - confirmations: Option, + commitment: Option, ) { info!("signature_subscribe"); match param::(&signature_str, "signature") { @@ -249,12 +250,8 @@ impl RpcSolPubSub for RpcSolPubSubImpl { "signature_subscribe: signature={:?} id={:?}", signature, sub_id ); - self.subscriptions.add_signature_subscription( - signature, - confirmations, - sub_id, - subscriber, - ); + self.subscriptions + .add_signature_subscription(signature, commitment, sub_id, subscriber); } Err(e) => subscriber.reject(e).unwrap(), } @@ -354,14 +351,15 @@ mod tests { bank_forks: &Arc>, tx: &Transaction, subscriptions: &RpcSubscriptions, + slot: Slot, ) -> transaction::Result<()> { bank_forks .write() .unwrap() - .get(0) + .get(slot) .unwrap() .process_transaction(tx)?; - subscriptions.notify_subscribers(0, &bank_forks); + subscriptions.notify_subscribers(slot); Ok(()) } @@ -387,6 +385,7 @@ mod tests { let rpc = RpcSolPubSubImpl { subscriptions: Arc::new(RpcSubscriptions::new( &Arc::new(AtomicBool::new(false)), + bank_forks.clone(), Arc::new(RwLock::new( BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), )), @@ -401,7 +400,7 @@ mod tests { let (subscriber, _id_receiver, receiver) = Subscriber::new_test("signatureNotification"); rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string(), None); - process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); + process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions, 0).unwrap(); // Test signature confirmation notification let (response, _) = robust_poll_or_panic(receiver); @@ -430,15 +429,15 @@ mod tests { } = create_genesis_config(10_000); let bob_pubkey = Pubkey::new_rand(); let bank = Bank::new(&genesis_config); - let arc_bank = Arc::new(bank); - let blockhash = arc_bank.last_blockhash(); + let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let session = create_session(); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore); + let rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks); io.extend_with(rpc.to_delegate()); let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash); @@ -493,14 +492,22 @@ mod tests { let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank_forks.write().unwrap().insert(bank1); let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let rpc = RpcSolPubSubImpl { subscriptions: Arc::new(RpcSubscriptions::new( &Arc::new(AtomicBool::new(false)), + bank_forks.clone(), Arc::new(RwLock::new( - BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), + BlockCommitmentCache::new_for_tests_with_blockstore_bank( + blockstore, + bank_forks.read().unwrap().get(1).unwrap().clone(), + 1, + ), )), )), uid: Arc::new(atomic::AtomicUsize::default()), @@ -515,7 +522,7 @@ mod tests { ); let tx = system_transaction::transfer(&alice, &contract_funds.pubkey(), 51, blockhash); - process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); + process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions, 1).unwrap(); let ixs = budget_instruction::when_signed( &contract_funds.pubkey(), @@ -530,14 +537,14 @@ mod tests { &ixs, blockhash, ); - process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); + process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions, 1).unwrap(); sleep(Duration::from_millis(200)); // Test signature confirmation notification #1 let expected_data = bank_forks .read() .unwrap() - .get(0) + .get(1) .unwrap() .get_account(&contract_state.pubkey()) .unwrap() @@ -547,7 +554,7 @@ mod tests { "method": "accountNotification", "params": { "result": { - "context": { "slot": 0 }, + "context": { "slot": 1 }, "value": { "owner": budget_program_id.to_string(), "lamports": 51, @@ -564,7 +571,7 @@ mod tests { assert_eq!(serde_json::to_string(&expected).unwrap(), response); let tx = system_transaction::transfer(&alice, &witness.pubkey(), 1, blockhash); - process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); + process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions, 1).unwrap(); sleep(Duration::from_millis(200)); let ix = budget_instruction::apply_signature( &witness.pubkey(), @@ -572,14 +579,14 @@ mod tests { &bob_pubkey, ); let tx = Transaction::new_signed_instructions(&[&witness], &[ix], blockhash); - process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); + process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions, 1).unwrap(); sleep(Duration::from_millis(200)); assert_eq!( bank_forks .read() .unwrap() - .get(0) + .get(1) .unwrap() .get_account(&contract_state.pubkey()), None @@ -593,9 +600,12 @@ mod tests { let session = create_session(); let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, Bank::new(&genesis_config)))); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore); + let rpc = + RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks.clone()); io.extend_with(rpc.to_delegate()); @@ -630,7 +640,7 @@ mod tests { #[test] #[should_panic] - fn test_account_confirmations_not_fulfilled() { + fn test_account_commitment_not_fulfilled() { let GenesisConfigInfo { genesis_config, mint_keypair: alice, @@ -638,15 +648,19 @@ mod tests { } = create_genesis_config(10_000); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); - let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let bank_forks = Arc::new(RwLock::new(BankForks::new(1, bank))); let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let bob = Keypair::new(); - let mut rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore.clone()); + let mut rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks( + blockstore.clone(), + bank_forks.clone(), + ); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new( &exit, + bank_forks.clone(), Arc::new(RwLock::new( BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), )), @@ -654,24 +668,29 @@ mod tests { rpc.subscriptions = Arc::new(subscriptions); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); - rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); + rpc.account_subscribe( + session, + subscriber, + bob.pubkey().to_string(), + Some(CommitmentConfig::root()), + ); let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash); bank_forks .write() .unwrap() - .get(0) + .get(1) .unwrap() .process_transaction(&tx) .unwrap(); - rpc.subscriptions.notify_subscribers(0, &bank_forks); + rpc.subscriptions.notify_subscribers(0); // allow 200ms for notification thread to wake std::thread::sleep(Duration::from_millis(200)); let _panic = robust_poll_or_panic(receiver); } #[test] - fn test_account_confirmations() { + fn test_account_commitment() { let GenesisConfigInfo { genesis_config, mint_keypair: alice, @@ -680,75 +699,65 @@ mod tests { let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank_forks.write().unwrap().insert(bank1); let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let bob = Keypair::new(); - let mut rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore.clone()); + let mut rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks( + blockstore.clone(), + bank_forks.clone(), + ); let exit = Arc::new(AtomicBool::new(false)); let block_commitment_cache = Arc::new(RwLock::new( BlockCommitmentCache::new_for_tests_with_blockstore(blockstore.clone()), )); - let subscriptions = RpcSubscriptions::new(&exit, block_commitment_cache.clone()); + let subscriptions = + RpcSubscriptions::new(&exit, bank_forks.clone(), block_commitment_cache.clone()); rpc.subscriptions = Arc::new(subscriptions); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); - rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); + rpc.account_subscribe( + session, + subscriber, + bob.pubkey().to_string(), + Some(CommitmentConfig::root()), + ); let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash); bank_forks .write() .unwrap() - .get(0) + .get(1) .unwrap() .process_transaction(&tx) .unwrap(); - rpc.subscriptions.notify_subscribers(0, &bank_forks); + rpc.subscriptions.notify_subscribers(1); - let bank0 = bank_forks.read().unwrap()[0].clone(); - let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); - bank_forks.write().unwrap().insert(bank1); let bank1 = bank_forks.read().unwrap()[1].clone(); - - let mut cache0 = BlockCommitment::default(); - cache0.increase_confirmation_stake(1, 10); - let mut block_commitment = HashMap::new(); - block_commitment.entry(0).or_insert(cache0.clone()); - let mut new_block_commitment = BlockCommitmentCache::new( - block_commitment, - 0, - 10, - bank1.clone(), - blockstore.clone(), - 0, - ); - let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); - std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); - drop(w_block_commitment_cache); - - rpc.subscriptions.notify_subscribers(1, &bank_forks); let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); bank_forks.write().unwrap().insert(bank2); + bank_forks.write().unwrap().set_root(1, &None, None); let bank2 = bank_forks.read().unwrap()[2].clone(); - let mut cache0 = BlockCommitment::default(); - cache0.increase_confirmation_stake(2, 10); - let mut block_commitment = HashMap::new(); - block_commitment.entry(0).or_insert(cache0.clone()); + let mut block_commitment: HashMap = HashMap::new(); + block_commitment.insert(0, BlockCommitment::default()); let mut new_block_commitment = - BlockCommitmentCache::new(block_commitment, 0, 10, bank2, blockstore.clone(), 0); + BlockCommitmentCache::new(block_commitment, 1, 10, bank2, blockstore.clone(), 1); let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); drop(w_block_commitment_cache); - rpc.subscriptions.notify_subscribers(2, &bank_forks); + rpc.subscriptions.notify_subscribers(2); let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", "params": { "result": { - "context": { "slot": 0 }, + "context": { "slot": 1 }, "value": { "owner": system_program::id().to_string(), "lamports": 100, @@ -769,7 +778,10 @@ mod tests { fn test_slot_subscribe() { let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); - let rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification"); rpc.slot_subscribe(session, subscriber); @@ -796,7 +808,10 @@ mod tests { fn test_slot_unsubscribe() { let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); - let rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification"); rpc.slot_subscribe(session, subscriber); diff --git a/core/src/rpc_pubsub_service.rs b/core/src/rpc_pubsub_service.rs index 47586642c..68e542ae8 100644 --- a/core/src/rpc_pubsub_service.rs +++ b/core/src/rpc_pubsub_service.rs @@ -73,7 +73,13 @@ impl PubSubService { mod tests { use super::*; use crate::commitment::BlockCommitmentCache; - use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; + use solana_ledger::{ + bank_forks::BankForks, + blockstore::Blockstore, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + get_tmp_ledger_path, + }; + use solana_runtime::bank::Bank; use std::{ net::{IpAddr, Ipv4Addr}, sync::RwLock, @@ -85,8 +91,12 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let subscriptions = Arc::new(RpcSubscriptions::new( &exit, + bank_forks, Arc::new(RwLock::new( BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), )), diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 3432542b8..393a1d381 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -14,9 +14,13 @@ use solana_client::rpc_response::{ use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore}; use solana_runtime::bank::Bank; use solana_sdk::{ - account::Account, clock::Slot, pubkey::Pubkey, signature::Signature, transaction, + account::Account, + clock::Slot, + commitment_config::{CommitmentConfig, CommitmentLevel}, + pubkey::Pubkey, + signature::Signature, + transaction, }; -use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use std::sync::{ atomic::{AtomicBool, Ordering}, mpsc::{Receiver, RecvTimeoutError, SendError, Sender}, @@ -24,7 +28,6 @@ use std::sync::{ use std::thread::{Builder, JoinHandle}; use std::time::Duration; use std::{ - cmp::min, collections::{HashMap, HashSet}, iter, sync::{Arc, Mutex, RwLock}, @@ -33,8 +36,6 @@ use tokio::runtime::{Builder as RuntimeBuilder, Runtime, TaskExecutor}; const RECEIVE_DELAY_MILLIS: u64 = 100; -pub type Confirmations = usize; - #[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub struct SlotInfo { pub slot: Slot, @@ -45,7 +46,7 @@ pub struct SlotInfo { enum NotificationEntry { Slot(SlotInfo), Root(Slot), - Bank((Slot, Arc>)), + Bank(Slot), } impl std::fmt::Debug for NotificationEntry { @@ -53,51 +54,57 @@ impl std::fmt::Debug for NotificationEntry { match self { NotificationEntry::Root(root) => write!(f, "Root({})", root), NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), - NotificationEntry::Bank((current_slot, _)) => { + NotificationEntry::Bank(current_slot) => { write!(f, "Bank({{current_slot: {:?}}})", current_slot) } } } } +struct SubscriptionData { + sink: Sink, + commitment: CommitmentConfig, + last_notified_slot: RwLock, +} type RpcAccountSubscriptions = - RwLock>, Confirmations)>>>; -type RpcProgramSubscriptions = RwLock< - HashMap>, Confirmations)>>, ->; + RwLock>>>>; +type RpcProgramSubscriptions = + RwLock>>>>; type RpcSignatureSubscriptions = RwLock< - HashMap< - Signature, - HashMap>, Confirmations)>, - >, + HashMap>>>, >; type RpcSlotSubscriptions = RwLock>>; type RpcRootSubscriptions = RwLock>>; fn add_subscription( - subscriptions: &mut HashMap, Confirmations)>>, + subscriptions: &mut HashMap>>, hashmap_key: K, - confirmations: Option, + commitment: Option, sub_id: SubscriptionId, subscriber: Subscriber, + last_notified_slot: Slot, ) where K: Eq + Hash, S: Clone, { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let confirmations = confirmations.unwrap_or(0); - let confirmations = min(confirmations, MAX_LOCKOUT_HISTORY + 1); + let commitment = commitment.unwrap_or_else(CommitmentConfig::recent); + let subscription_data = SubscriptionData { + sink, + commitment, + last_notified_slot: RwLock::new(last_notified_slot), + }; if let Some(current_hashmap) = subscriptions.get_mut(&hashmap_key) { - current_hashmap.insert(sub_id, (sink, confirmations)); + current_hashmap.insert(sub_id, subscription_data); return; } let mut hashmap = HashMap::new(); - hashmap.insert(sub_id, (sink, confirmations)); + hashmap.insert(sub_id, subscription_data); subscriptions.insert(hashmap_key, hashmap); } fn remove_subscription( - subscriptions: &mut HashMap, Confirmations)>>, + subscriptions: &mut HashMap>>, sub_id: &SubscriptionId, ) -> bool where @@ -119,8 +126,8 @@ where } #[allow(clippy::type_complexity)] -fn check_confirmations_and_notify( - subscriptions: &HashMap>, Confirmations)>>, +fn check_commitment_and_notify( + subscriptions: &HashMap>>>, hashmap_key: &K, bank_forks: &Arc>, block_commitment_cache: &Arc>, @@ -132,53 +139,50 @@ where K: Eq + Hash + Clone + Copy, S: Clone + Serialize, B: Fn(&Bank, &K) -> X, - F: Fn(X, u64) -> Box>, - X: Clone + Serialize, + F: Fn(X, Slot) -> (Box>, Slot), + X: Clone + Serialize + Default, { - let mut confirmation_slots: HashMap = HashMap::new(); let r_block_commitment_cache = block_commitment_cache.read().unwrap(); let current_slot = r_block_commitment_cache.slot(); - let root = r_block_commitment_cache.root(); - let current_ancestors = bank_forks - .read() - .unwrap() - .get(current_slot) - .unwrap() - .ancestors - .clone(); - for (slot, _) in current_ancestors.iter() { - if let Some(confirmations) = r_block_commitment_cache.get_confirmation_count(*slot) { - confirmation_slots.entry(confirmations).or_insert(*slot); - } - } + let node_root = r_block_commitment_cache.root(); + let largest_confirmed_root = r_block_commitment_cache.largest_confirmed_root(); drop(r_block_commitment_cache); let mut notified_set: HashSet = HashSet::new(); if let Some(hashmap) = subscriptions.get(hashmap_key) { - for (sub_id, (sink, confirmations)) in hashmap.iter() { - let desired_slot = if *confirmations == 0 { - Some(¤t_slot) - } else if *confirmations == MAX_LOCKOUT_HISTORY + 1 { - Some(&root) - } else { - confirmation_slots.get(confirmations) + for ( + sub_id, + SubscriptionData { + sink, + commitment, + last_notified_slot, + }, + ) in hashmap.iter() + { + let slot = match commitment.commitment { + CommitmentLevel::Max => largest_confirmed_root, + CommitmentLevel::Recent => current_slot, + CommitmentLevel::Root => node_root, }; - if let Some(&slot) = desired_slot { - let results = { - let bank_forks = bank_forks.read().unwrap(); - let desired_bank = bank_forks.get(slot).unwrap(); - bank_method(&desired_bank, hashmap_key) - }; - for result in filter_results(results, root) { - notifier.notify( - Response { - context: RpcResponseContext { slot }, - value: result, - }, - sink, - ); - notified_set.insert(sub_id.clone()); - } + let results = { + let bank_forks = bank_forks.read().unwrap(); + bank_forks + .get(slot) + .map(|desired_bank| bank_method(&desired_bank, hashmap_key)) + .unwrap_or_default() + }; + let mut w_last_notified_slot = last_notified_slot.write().unwrap(); + let (filter_results, result_slot) = filter_results(results, *w_last_notified_slot); + for result in filter_results { + notifier.notify( + Response { + context: RpcResponseContext { slot }, + value: result, + }, + sink, + ); + notified_set.insert(sub_id.clone()); + *w_last_notified_slot = result_slot; } } } @@ -199,50 +203,63 @@ impl RpcNotifier { fn filter_account_result( result: Option<(Account, Slot)>, - root: Slot, -) -> Box> { + last_notified_slot: Slot, +) -> (Box>, Slot) { if let Some((account, fork)) = result { - if fork >= root { - return Box::new(iter::once(RpcAccount::encode(account))); + if fork != last_notified_slot { + return (Box::new(iter::once(RpcAccount::encode(account))), fork); } } - Box::new(iter::empty()) + (Box::new(iter::empty()), last_notified_slot) } fn filter_signature_result( result: Option>, - _root: Slot, -) -> Box> { - Box::new( - result - .into_iter() - .map(|result| RpcSignatureResult { err: result.err() }), + last_notified_slot: Slot, +) -> (Box>, Slot) { + ( + Box::new( + result + .into_iter() + .map(|result| RpcSignatureResult { err: result.err() }), + ), + last_notified_slot, ) } fn filter_program_results( accounts: Vec<(Pubkey, Account)>, - _root: Slot, -) -> Box> { - Box::new( - accounts - .into_iter() - .map(|(pubkey, account)| RpcKeyedAccount { - pubkey: pubkey.to_string(), - account: RpcAccount::encode(account), - }), + last_notified_slot: Slot, +) -> (Box>, Slot) { + ( + Box::new( + accounts + .into_iter() + .map(|(pubkey, account)| RpcKeyedAccount { + pubkey: pubkey.to_string(), + account: RpcAccount::encode(account), + }), + ), + last_notified_slot, ) } -pub struct RpcSubscriptions { +#[derive(Clone)] +struct Subscriptions { account_subscriptions: Arc, program_subscriptions: Arc, signature_subscriptions: Arc, slot_subscriptions: Arc, root_subscriptions: Arc, +} + +pub struct RpcSubscriptions { + subscriptions: Subscriptions, notification_sender: Arc>>, t_cleanup: Option>, notifier_runtime: Option, + bank_forks: Arc>, + block_commitment_cache: Arc>, exit: Arc, } @@ -257,6 +274,7 @@ impl Drop for RpcSubscriptions { impl RpcSubscriptions { pub fn new( exit: &Arc, + bank_forks: Arc>, block_commitment_cache: Arc>, ) -> Self { let (notification_sender, notification_receiver): ( @@ -271,12 +289,17 @@ impl RpcSubscriptions { let root_subscriptions = Arc::new(RpcRootSubscriptions::default()); let notification_sender = Arc::new(Mutex::new(notification_sender)); + let _bank_forks = bank_forks.clone(); + let _block_commitment_cache = block_commitment_cache.clone(); let exit_clone = exit.clone(); - let account_subscriptions_clone = account_subscriptions.clone(); - let program_subscriptions_clone = program_subscriptions.clone(); - let signature_subscriptions_clone = signature_subscriptions.clone(); - let slot_subscriptions_clone = slot_subscriptions.clone(); - let root_subscriptions_clone = root_subscriptions.clone(); + let subscriptions = Subscriptions { + account_subscriptions, + program_subscriptions, + signature_subscriptions, + slot_subscriptions, + root_subscriptions, + }; + let _subscriptions = subscriptions.clone(); let notifier_runtime = RuntimeBuilder::new() .core_threads(1) @@ -292,32 +315,31 @@ impl RpcSubscriptions { exit_clone, notifier, notification_receiver, - account_subscriptions_clone, - program_subscriptions_clone, - signature_subscriptions_clone, - slot_subscriptions_clone, - root_subscriptions_clone, - block_commitment_cache, + _subscriptions, + _bank_forks, + _block_commitment_cache, ); }) .unwrap(); Self { - account_subscriptions, - program_subscriptions, - signature_subscriptions, - slot_subscriptions, - root_subscriptions, + subscriptions, notification_sender, notifier_runtime: Some(notifier_runtime), t_cleanup: Some(t_cleanup), + bank_forks, + block_commitment_cache, exit: exit.clone(), } } - pub fn default_with_blockstore(blockstore: Arc) -> Self { + pub fn default_with_blockstore_bank_forks( + blockstore: Arc, + bank_forks: Arc>, + ) -> Self { Self::new( &Arc::new(AtomicBool::new(false)), + bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore( blockstore, ))), @@ -332,12 +354,12 @@ impl RpcSubscriptions { notifier: &RpcNotifier, ) { let subscriptions = account_subscriptions.read().unwrap(); - check_confirmations_and_notify( + check_commitment_and_notify( &subscriptions, pubkey, bank_forks, block_commitment_cache, - Bank::get_account_modified_since_parent, + Bank::get_account_modified_slot, filter_account_result, notifier, ); @@ -351,7 +373,7 @@ impl RpcSubscriptions { notifier: &RpcNotifier, ) { let subscriptions = program_subscriptions.read().unwrap(); - check_confirmations_and_notify( + check_commitment_and_notify( &subscriptions, program_id, bank_forks, @@ -370,7 +392,7 @@ impl RpcSubscriptions { notifier: &RpcNotifier, ) { let mut subscriptions = signature_subscriptions.write().unwrap(); - let notified_ids = check_confirmations_and_notify( + let notified_ids = check_commitment_and_notify( &subscriptions, signature, bank_forks, @@ -390,83 +412,109 @@ impl RpcSubscriptions { pub fn add_account_subscription( &self, pubkey: Pubkey, - confirmations: Option, + commitment: Option, sub_id: SubscriptionId, subscriber: Subscriber>, ) { - let mut subscriptions = self.account_subscriptions.write().unwrap(); + let mut subscriptions = self.subscriptions.account_subscriptions.write().unwrap(); + let slot = match commitment + .unwrap_or_else(CommitmentConfig::recent) + .commitment + { + CommitmentLevel::Max => self + .block_commitment_cache + .read() + .unwrap() + .largest_confirmed_root(), + CommitmentLevel::Recent => self.block_commitment_cache.read().unwrap().slot(), + CommitmentLevel::Root => self.block_commitment_cache.read().unwrap().root(), + }; + let last_notified_slot = if let Some((_account, slot)) = self + .bank_forks + .read() + .unwrap() + .get(slot) + .and_then(|bank| bank.get_account_modified_slot(&pubkey)) + { + slot + } else { + 0 + }; add_subscription( &mut subscriptions, pubkey, - confirmations, + commitment, sub_id, subscriber, + last_notified_slot, ); } pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool { - let mut subscriptions = self.account_subscriptions.write().unwrap(); + let mut subscriptions = self.subscriptions.account_subscriptions.write().unwrap(); remove_subscription(&mut subscriptions, id) } pub fn add_program_subscription( &self, program_id: Pubkey, - confirmations: Option, + commitment: Option, sub_id: SubscriptionId, subscriber: Subscriber>, ) { - let mut subscriptions = self.program_subscriptions.write().unwrap(); + let mut subscriptions = self.subscriptions.program_subscriptions.write().unwrap(); add_subscription( &mut subscriptions, program_id, - confirmations, + commitment, sub_id, subscriber, + 0, // last_notified_slot is not utilized for program subscriptions ); } pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool { - let mut subscriptions = self.program_subscriptions.write().unwrap(); + let mut subscriptions = self.subscriptions.program_subscriptions.write().unwrap(); remove_subscription(&mut subscriptions, id) } pub fn add_signature_subscription( &self, signature: Signature, - confirmations: Option, + commitment: Option, sub_id: SubscriptionId, subscriber: Subscriber>, ) { - let mut subscriptions = self.signature_subscriptions.write().unwrap(); + let mut subscriptions = self.subscriptions.signature_subscriptions.write().unwrap(); add_subscription( &mut subscriptions, signature, - confirmations, + commitment, sub_id, subscriber, + 0, // last_notified_slot is not utilized for signature subscriptions ); } pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool { - let mut subscriptions = self.signature_subscriptions.write().unwrap(); + let mut subscriptions = self.subscriptions.signature_subscriptions.write().unwrap(); remove_subscription(&mut subscriptions, id) } /// Notify subscribers of changes to any accounts or new signatures since /// the bank's last checkpoint. - pub fn notify_subscribers(&self, current_slot: Slot, bank_forks: &Arc>) { - self.enqueue_notification(NotificationEntry::Bank((current_slot, bank_forks.clone()))); + pub fn notify_subscribers(&self, current_slot: Slot) { + self.enqueue_notification(NotificationEntry::Bank(current_slot)); } pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let mut subscriptions = self.slot_subscriptions.write().unwrap(); + let mut subscriptions = self.subscriptions.slot_subscriptions.write().unwrap(); subscriptions.insert(sub_id, sink); } pub fn remove_slot_subscription(&self, id: &SubscriptionId) -> bool { - let mut subscriptions = self.slot_subscriptions.write().unwrap(); + let mut subscriptions = self.subscriptions.slot_subscriptions.write().unwrap(); subscriptions.remove(id).is_some() } @@ -476,12 +524,12 @@ impl RpcSubscriptions { pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let mut subscriptions = self.root_subscriptions.write().unwrap(); + let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap(); subscriptions.insert(sub_id, sink); } pub fn remove_root_subscription(&self, id: &SubscriptionId) -> bool { - let mut subscriptions = self.root_subscriptions.write().unwrap(); + let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap(); subscriptions.remove(id).is_some() } @@ -513,11 +561,8 @@ impl RpcSubscriptions { exit: Arc, notifier: RpcNotifier, notification_receiver: Receiver, - account_subscriptions: Arc, - program_subscriptions: Arc, - signature_subscriptions: Arc, - slot_subscriptions: Arc, - root_subscriptions: Arc, + subscriptions: Subscriptions, + bank_forks: Arc>, block_commitment_cache: Arc>, ) { loop { @@ -527,20 +572,20 @@ impl RpcSubscriptions { match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) { Ok(notification_entry) => match notification_entry { NotificationEntry::Slot(slot_info) => { - let subscriptions = slot_subscriptions.read().unwrap(); + let subscriptions = subscriptions.slot_subscriptions.read().unwrap(); for (_, sink) in subscriptions.iter() { notifier.notify(slot_info, sink); } } NotificationEntry::Root(root) => { - let subscriptions = root_subscriptions.read().unwrap(); + let subscriptions = subscriptions.root_subscriptions.read().unwrap(); for (_, sink) in subscriptions.iter() { notifier.notify(root, sink); } } - NotificationEntry::Bank((_current_slot, bank_forks)) => { + NotificationEntry::Bank(_current_slot) => { let pubkeys: Vec<_> = { - let subs = account_subscriptions.read().unwrap(); + let subs = subscriptions.account_subscriptions.read().unwrap(); subs.keys().cloned().collect() }; for pubkey in &pubkeys { @@ -548,13 +593,13 @@ impl RpcSubscriptions { pubkey, &bank_forks, &block_commitment_cache, - account_subscriptions.clone(), + subscriptions.account_subscriptions.clone(), ¬ifier, ); } let programs: Vec<_> = { - let subs = program_subscriptions.read().unwrap(); + let subs = subscriptions.program_subscriptions.read().unwrap(); subs.keys().cloned().collect() }; for program_id in &programs { @@ -562,13 +607,13 @@ impl RpcSubscriptions { program_id, &bank_forks, &block_commitment_cache, - program_subscriptions.clone(), + subscriptions.program_subscriptions.clone(), ¬ifier, ); } let signatures: Vec<_> = { - let subs = signature_subscriptions.read().unwrap(); + let subs = subscriptions.signature_subscriptions.read().unwrap(); subs.keys().cloned().collect() }; for signature in &signatures { @@ -576,7 +621,7 @@ impl RpcSubscriptions { signature, &bank_forks, &block_commitment_cache, - signature_subscriptions.clone(), + subscriptions.signature_subscriptions.clone(), ¬ifier, ); } @@ -671,7 +716,35 @@ pub(crate) mod tests { let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank_forks.write().unwrap().insert(bank1); let alice = Keypair::new(); + + let (subscriber, _id_receiver, transport_receiver) = + Subscriber::new_test("accountNotification"); + let sub_id = SubscriptionId::Number(0 as u64); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = RpcSubscriptions::new( + &exit, + bank_forks.clone(), + Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore_bank( + blockstore, + bank_forks.read().unwrap().get(1).unwrap().clone(), + 1, + ), + )), + ); + subscriptions.add_account_subscription(alice.pubkey(), None, sub_id.clone(), subscriber); + + assert!(subscriptions + .subscriptions + .account_subscriptions + .read() + .unwrap() + .contains_key(&alice.pubkey())); + let tx = system_transaction::create_account( &mint_keypair, &alice, @@ -683,37 +756,18 @@ pub(crate) mod tests { bank_forks .write() .unwrap() - .get(0) + .get(1) .unwrap() .process_transaction(&tx) .unwrap(); - - let (subscriber, _id_receiver, transport_receiver) = - Subscriber::new_test("accountNotification"); - let sub_id = SubscriptionId::Number(0 as u64); - let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = RpcSubscriptions::new( - &exit, - Arc::new(RwLock::new( - BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), - )), - ); - subscriptions.add_account_subscription(alice.pubkey(), None, sub_id.clone(), subscriber); - - assert!(subscriptions - .account_subscriptions - .read() - .unwrap() - .contains_key(&alice.pubkey())); - - subscriptions.notify_subscribers(0, &bank_forks); + subscriptions.notify_subscribers(1); let (response, _) = robust_poll_or_panic(transport_receiver); let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", "params": { "result": { - "context": { "slot": 0 }, + "context": { "slot": 1 }, "value": { "data": "1111111111111111", "executable": false, @@ -729,6 +783,7 @@ pub(crate) mod tests { subscriptions.remove_account_subscription(&sub_id); assert!(!subscriptions + .subscriptions .account_subscriptions .read() .unwrap() @@ -771,6 +826,7 @@ pub(crate) mod tests { let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new( &exit, + bank_forks, Arc::new(RwLock::new( BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), )), @@ -783,12 +839,13 @@ pub(crate) mod tests { ); assert!(subscriptions + .subscriptions .program_subscriptions .read() .unwrap() .contains_key(&solana_budget_program::id())); - subscriptions.notify_subscribers(0, &bank_forks); + subscriptions.notify_subscribers(0); let (response, _) = robust_poll_or_panic(transport_receiver); let expected = json!({ "jsonrpc": "2.0", @@ -814,6 +871,7 @@ pub(crate) mod tests { subscriptions.remove_program_subscription(&sub_id); assert!(!subscriptions + .subscriptions .program_subscriptions .read() .unwrap() @@ -872,8 +930,11 @@ pub(crate) mod tests { BlockCommitmentCache::new(block_commitment, 0, 10, bank1, blockstore, 0); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = - RpcSubscriptions::new(&exit, Arc::new(RwLock::new(block_commitment_cache))); + let subscriptions = RpcSubscriptions::new( + &exit, + bank_forks, + Arc::new(RwLock::new(block_commitment_cache)), + ); let (past_bank_sub1, _id_receiver, past_bank_recv1) = Subscriber::new_test("signatureNotification"); @@ -884,37 +945,41 @@ pub(crate) mod tests { subscriptions.add_signature_subscription( past_bank_tx.signatures[0], - Some(0), + Some(CommitmentConfig::recent()), SubscriptionId::Number(1 as u64), past_bank_sub1, ); subscriptions.add_signature_subscription( past_bank_tx.signatures[0], - Some(1), + Some(CommitmentConfig::root()), SubscriptionId::Number(2 as u64), past_bank_sub2, ); subscriptions.add_signature_subscription( processed_tx.signatures[0], - Some(0), + Some(CommitmentConfig::recent()), SubscriptionId::Number(3 as u64), processed_sub, ); subscriptions.add_signature_subscription( unprocessed_tx.signatures[0], - Some(0), + Some(CommitmentConfig::recent()), SubscriptionId::Number(4 as u64), Subscriber::new_test("signatureNotification").0, ); { - let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); + let sig_subs = subscriptions + .subscriptions + .signature_subscriptions + .read() + .unwrap(); assert_eq!(sig_subs.get(&past_bank_tx.signatures[0]).unwrap().len(), 2); assert!(sig_subs.contains_key(&unprocessed_tx.signatures[0])); assert!(sig_subs.contains_key(&processed_tx.signatures[0])); } - subscriptions.notify_subscribers(1, &bank_forks); + subscriptions.notify_subscribers(1); let expected_res = RpcSignatureResult { err: None }; struct Notification { @@ -954,7 +1019,11 @@ pub(crate) mod tests { assert_eq!(expected, response); // Subscription should be automatically removed after notification - let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); + let sig_subs = subscriptions + .subscriptions + .signature_subscriptions + .read() + .unwrap(); assert!(!sig_subs.contains_key(&processed_tx.signatures[0])); assert!(!sig_subs.contains_key(&past_bank_tx.signatures[0])); @@ -974,8 +1043,12 @@ pub(crate) mod tests { let exit = Arc::new(AtomicBool::new(false)); let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let subscriptions = RpcSubscriptions::new( &exit, + bank_forks, Arc::new(RwLock::new( BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), )), @@ -983,6 +1056,7 @@ pub(crate) mod tests { subscriptions.add_slot_subscription(sub_id.clone(), subscriber); assert!(subscriptions + .subscriptions .slot_subscriptions .read() .unwrap() @@ -1005,6 +1079,7 @@ pub(crate) mod tests { subscriptions.remove_slot_subscription(&sub_id); assert!(!subscriptions + .subscriptions .slot_subscriptions .read() .unwrap() @@ -1020,8 +1095,12 @@ pub(crate) mod tests { let exit = Arc::new(AtomicBool::new(false)); let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let subscriptions = RpcSubscriptions::new( &exit, + bank_forks, Arc::new(RwLock::new( BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), )), @@ -1029,6 +1108,7 @@ pub(crate) mod tests { subscriptions.add_root_subscription(sub_id.clone(), subscriber); assert!(subscriptions + .subscriptions .root_subscriptions .read() .unwrap() @@ -1050,6 +1130,7 @@ pub(crate) mod tests { subscriptions.remove_root_subscription(&sub_id); assert!(!subscriptions + .subscriptions .root_subscriptions .read() .unwrap() @@ -1059,7 +1140,7 @@ pub(crate) mod tests { #[test] #[serial] fn test_add_and_remove_subscription() { - let mut subscriptions: HashMap, Confirmations)>> = + let mut subscriptions: HashMap>> = HashMap::new(); let num_keys = 5; @@ -1067,7 +1148,7 @@ pub(crate) mod tests { let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("notification"); let sub_id = SubscriptionId::Number(key); - add_subscription(&mut subscriptions, key, None, sub_id, subscriber); + add_subscription(&mut subscriptions, key, None, sub_id, subscriber, 0); } // Add another subscription to the "0" key @@ -1079,6 +1160,7 @@ pub(crate) mod tests { None, extra_sub_id.clone(), subscriber, + 0, ); assert_eq!(subscriptions.len(), num_keys as usize); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index c5a270c58..1d9b7d4a9 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -304,11 +304,12 @@ pub mod tests { BlockCommitmentCache::default_with_blockstore(blockstore.clone()), )); let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); + let bank_forks = Arc::new(RwLock::new(bank_forks)); let tvu = Tvu::new( &vote_keypair.pubkey(), vec![Arc::new(vote_keypair)], &storage_keypair, - &Arc::new(RwLock::new(bank_forks)), + &bank_forks, &cref1, { Sockets { @@ -321,7 +322,11 @@ pub mod tests { blockstore, &StorageState::default(), l_receiver, - &Arc::new(RpcSubscriptions::new(&exit, block_commitment_cache.clone())), + &Arc::new(RpcSubscriptions::new( + &exit, + bank_forks.clone(), + block_commitment_cache.clone(), + )), &poh_recorder, &leader_schedule_cache, &exit, diff --git a/core/src/validator.rs b/core/src/validator.rs index bb9089d3d..4f55b6bb8 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -241,7 +241,11 @@ impl Validator { BlockCommitmentCache::default_with_blockstore(blockstore.clone()), )); - let subscriptions = Arc::new(RpcSubscriptions::new(&exit, block_commitment_cache.clone())); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + bank_forks.clone(), + block_commitment_cache.clone(), + )); let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| { if ContactInfo::is_valid_address(&node.info.rpc) { diff --git a/core/tests/client.rs b/core/tests/client.rs index 7e16276a1..0c5755ee8 100644 --- a/core/tests/client.rs +++ b/core/tests/client.rs @@ -6,7 +6,13 @@ use solana_core::{ commitment::BlockCommitmentCache, rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions, validator::TestValidator, }; -use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; +use solana_ledger::{ + bank_forks::BankForks, + blockstore::Blockstore, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + get_tmp_ledger_path, +}; +use solana_runtime::bank::Bank; use solana_sdk::{ commitment_config::CommitmentConfig, pubkey::Pubkey, rpc_port, signature::Signer, system_transaction, @@ -88,8 +94,12 @@ fn test_slot_subscription() { let exit = Arc::new(AtomicBool::new(false)); let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let subscriptions = Arc::new(RpcSubscriptions::new( &exit, + bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore( blockstore, ))), diff --git a/core/tests/rpc.rs b/core/tests/rpc.rs index 1d12fb8c1..8189cf28c 100644 --- a/core/tests/rpc.rs +++ b/core/tests/rpc.rs @@ -9,7 +9,7 @@ use reqwest::{self, header::CONTENT_TYPE}; use serde_json::{json, Value}; use solana_client::{ rpc_client::{get_rpc_request_str, RpcClient}, - rpc_response::{Response, RpcSignatureResult}, + rpc_response::{Response, RpcAccount, RpcSignatureResult}, }; use solana_core::contact_info::ContactInfo; use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, validator::TestValidator}; @@ -164,9 +164,15 @@ fn test_rpc_subscriptions() { .iter() .map(|tx| tx.signatures[0].to_string()) .collect(); + let account_set: HashSet = transactions + .iter() + .map(|tx| tx.message.account_keys[1].to_string()) + .collect(); // Track when subscriptions are ready let (ready_sender, ready_receiver) = channel::<()>(); + // Track account notifications are received + let (account_sender, account_receiver) = channel::>(); // Track when status notifications are received let (status_sender, status_receiver) = channel::<(String, Response)>(); @@ -209,6 +215,22 @@ fn test_rpc_subscriptions() { eprintln!("slot sub err: {:#?}", err); }), ); + for pubkey in account_set { + let account_sender = account_sender.clone(); + tokio::spawn( + client + .account_subscribe(pubkey, None) + .and_then(move |account_stream| { + account_stream.for_each(move |result| { + account_sender.send(result).unwrap(); + future::ok(()) + }) + }) + .map_err(|err| { + eprintln!("acct sub err: {:#?}", err); + }), + ); + } future::ok(()) }) .map_err(|_| ()) @@ -262,6 +284,26 @@ fn test_rpc_subscriptions() { } } + let deadline = Instant::now() + Duration::from_secs(5); + let mut account_notifications = transactions.len(); + while account_notifications > 0 { + let timeout = deadline.saturating_duration_since(Instant::now()); + match account_receiver.recv_timeout(timeout) { + Ok(result) => { + assert_eq!(result.value.lamports, 1); + account_notifications -= 1; + } + Err(_err) => { + assert!( + false, + "recv_timeout, {}/{} accounts remaining", + account_notifications, + transactions.len() + ); + } + } + } + rt.shutdown_now().wait().unwrap(); server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 0edafebb2..addd71145 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1816,12 +1816,14 @@ impl Bank { } pub fn get_account(&self, pubkey: &Pubkey) -> Option { - self.rc - .accounts - .load_slow(&self.ancestors, pubkey) + self.get_account_modified_slot(pubkey) .map(|(acc, _slot)| acc) } + pub fn get_account_modified_slot(&self, pubkey: &Pubkey) -> Option<(Account, Slot)> { + self.rc.accounts.load_slow(&self.ancestors, pubkey) + } + // Exclude self to really fetch the parent Bank's account hash and data. // // Being idempotent is needed to make the lazy initialization possible,