Refactor RPC subscriptions account handling (#9888)

* Switch subscriptions to use commitment instead of confirmations

* Add bank method to return account and last-modified slot

* Add last_modified_slot to subscription data and use to filter account subscriptions

* Update tests to non-zero last_notified_slot

* Add accounts subscriptions to test; fails at higher tx load

* Pass BankForks to RpcSubscriptions

* Use BankForks on add_account_subscription to properly initialize last_notified_slot

* Bundle subscriptions

* Check for non-equality

* Use commitment to initialize last_notified_slot; revert context.slot chage
This commit is contained in:
Tyera Eulberg 2020-05-07 00:23:06 -06:00 committed by GitHub
parent f6e26f6c8c
commit 754c65c066
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 464 additions and 277 deletions

View File

@ -162,6 +162,24 @@ impl BlockCommitmentCache {
} }
} }
#[cfg(test)]
pub fn new_for_tests_with_blockstore_bank(
blockstore: Arc<Blockstore>,
bank: Arc<Bank>,
root: Slot,
) -> Self {
let mut block_commitment: HashMap<Slot, BlockCommitment> = HashMap::new();
block_commitment.insert(0, BlockCommitment::default());
Self {
block_commitment,
blockstore,
total_stake: 42,
largest_confirmed_root: root,
bank,
root,
}
}
#[cfg(test)] #[cfg(test)]
pub(crate) fn set_get_largest_confirmed_root(&mut self, root: Slot) { pub(crate) fn set_get_largest_confirmed_root(&mut self, root: Slot) {
self.largest_confirmed_root = root; self.largest_confirmed_root = root;

View File

@ -308,10 +308,8 @@ impl ReplayStage {
// Vote on a fork // Vote on a fork
if let Some(ref vote_bank) = vote_bank { if let Some(ref vote_bank) = vote_bank {
subscriptions.notify_subscribers( subscriptions
block_commitment_cache.read().unwrap().slot(), .notify_subscribers(block_commitment_cache.read().unwrap().slot());
&bank_forks,
);
if let Some(votable_leader) = if let Some(votable_leader) =
leader_schedule_cache.slot_leader_at(vote_bank.slot(), Some(vote_bank)) leader_schedule_cache.slot_leader_at(vote_bank.slot(), Some(vote_bank))
{ {
@ -2060,12 +2058,6 @@ pub(crate) mod tests {
); );
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore(
blockstore.clone(),
))),
));
let mut bank_forks = BankForks::new(0, bank0); let mut bank_forks = BankForks::new(0, bank0);
// Insert a non-root bank so that the propagation logic will update this // Insert a non-root bank so that the propagation logic will update this
@ -2089,7 +2081,14 @@ pub(crate) mod tests {
assert!(progress.get_propagated_stats(1).unwrap().is_leader_slot); assert!(progress.get_propagated_stats(1).unwrap().is_leader_slot);
bank1.freeze(); bank1.freeze();
bank_forks.insert(bank1); bank_forks.insert(bank1);
let bank_forks = RwLock::new(bank_forks); let bank_forks = Arc::new(RwLock::new(bank_forks));
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore(
blockstore.clone(),
))),
));
// Insert shreds for slot NUM_CONSECUTIVE_LEADER_SLOTS, // Insert shreds for slot NUM_CONSECUTIVE_LEADER_SLOTS,
// chaining to slot 1 // chaining to slot 1

View File

@ -1,6 +1,6 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request //! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::rpc_subscriptions::{Confirmations, RpcSubscriptions, SlotInfo}; use crate::rpc_subscriptions::{RpcSubscriptions, SlotInfo};
use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_core::{Error, ErrorCode, Result};
use jsonrpc_derive::rpc; use jsonrpc_derive::rpc;
use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId};
@ -8,8 +8,12 @@ use solana_client::rpc_response::{
Response as RpcResponse, RpcAccount, RpcKeyedAccount, RpcSignatureResult, Response as RpcResponse, RpcAccount, RpcKeyedAccount, RpcSignatureResult,
}; };
#[cfg(test)] #[cfg(test)]
use solana_ledger::blockstore::Blockstore; use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore};
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; use solana_sdk::{
clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature,
};
#[cfg(test)]
use std::sync::RwLock;
use std::{ use std::{
str::FromStr, str::FromStr,
sync::{atomic, Arc}, sync::{atomic, Arc},
@ -35,7 +39,7 @@ pub trait RpcSolPubSub {
meta: Self::Metadata, meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcAccount>>, subscriber: Subscriber<RpcResponse<RpcAccount>>,
pubkey_str: String, pubkey_str: String,
confirmations: Option<Confirmations>, commitment: Option<CommitmentConfig>,
); );
// Unsubscribe from account notification subscription. // Unsubscribe from account notification subscription.
@ -59,7 +63,7 @@ pub trait RpcSolPubSub {
meta: Self::Metadata, meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>, subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
pubkey_str: String, pubkey_str: String,
confirmations: Option<Confirmations>, commitment: Option<CommitmentConfig>,
); );
// Unsubscribe from account notification subscription. // Unsubscribe from account notification subscription.
@ -83,7 +87,7 @@ pub trait RpcSolPubSub {
meta: Self::Metadata, meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcSignatureResult>>, subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
signature_str: String, signature_str: String,
confirmations: Option<Confirmations>, commitment: Option<CommitmentConfig>,
); );
// Unsubscribe from signature notification subscription. // Unsubscribe from signature notification subscription.
@ -135,9 +139,14 @@ impl RpcSolPubSubImpl {
} }
#[cfg(test)] #[cfg(test)]
fn default_with_blockstore(blockstore: Arc<Blockstore>) -> Self { fn default_with_blockstore_bank_forks(
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
let uid = Arc::new(atomic::AtomicUsize::default()); let uid = Arc::new(atomic::AtomicUsize::default());
let subscriptions = Arc::new(RpcSubscriptions::default_with_blockstore(blockstore)); let subscriptions = Arc::new(RpcSubscriptions::default_with_blockstore_bank_forks(
blockstore, bank_forks,
));
Self { uid, subscriptions } Self { uid, subscriptions }
} }
} }
@ -158,19 +167,15 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
_meta: Self::Metadata, _meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcAccount>>, subscriber: Subscriber<RpcResponse<RpcAccount>>,
pubkey_str: String, pubkey_str: String,
confirmations: Option<Confirmations>, commitment: Option<CommitmentConfig>,
) { ) {
match param::<Pubkey>(&pubkey_str, "pubkey") { match param::<Pubkey>(&pubkey_str, "pubkey") {
Ok(pubkey) => { Ok(pubkey) => {
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
let sub_id = SubscriptionId::Number(id as u64); let sub_id = SubscriptionId::Number(id as u64);
info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id); info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id);
self.subscriptions.add_account_subscription( self.subscriptions
pubkey, .add_account_subscription(pubkey, commitment, sub_id, subscriber)
confirmations,
sub_id,
subscriber,
)
} }
Err(e) => subscriber.reject(e).unwrap(), Err(e) => subscriber.reject(e).unwrap(),
} }
@ -198,19 +203,15 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
_meta: Self::Metadata, _meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>, subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
pubkey_str: String, pubkey_str: String,
confirmations: Option<Confirmations>, commitment: Option<CommitmentConfig>,
) { ) {
match param::<Pubkey>(&pubkey_str, "pubkey") { match param::<Pubkey>(&pubkey_str, "pubkey") {
Ok(pubkey) => { Ok(pubkey) => {
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
let sub_id = SubscriptionId::Number(id as u64); let sub_id = SubscriptionId::Number(id as u64);
info!("program_subscribe: account={:?} id={:?}", pubkey, sub_id); info!("program_subscribe: account={:?} id={:?}", pubkey, sub_id);
self.subscriptions.add_program_subscription( self.subscriptions
pubkey, .add_program_subscription(pubkey, commitment, sub_id, subscriber)
confirmations,
sub_id,
subscriber,
)
} }
Err(e) => subscriber.reject(e).unwrap(), Err(e) => subscriber.reject(e).unwrap(),
} }
@ -238,7 +239,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
_meta: Self::Metadata, _meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcSignatureResult>>, subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
signature_str: String, signature_str: String,
confirmations: Option<Confirmations>, commitment: Option<CommitmentConfig>,
) { ) {
info!("signature_subscribe"); info!("signature_subscribe");
match param::<Signature>(&signature_str, "signature") { match param::<Signature>(&signature_str, "signature") {
@ -249,12 +250,8 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
"signature_subscribe: signature={:?} id={:?}", "signature_subscribe: signature={:?} id={:?}",
signature, sub_id signature, sub_id
); );
self.subscriptions.add_signature_subscription( self.subscriptions
signature, .add_signature_subscription(signature, commitment, sub_id, subscriber);
confirmations,
sub_id,
subscriber,
);
} }
Err(e) => subscriber.reject(e).unwrap(), Err(e) => subscriber.reject(e).unwrap(),
} }
@ -354,14 +351,15 @@ mod tests {
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
tx: &Transaction, tx: &Transaction,
subscriptions: &RpcSubscriptions, subscriptions: &RpcSubscriptions,
slot: Slot,
) -> transaction::Result<()> { ) -> transaction::Result<()> {
bank_forks bank_forks
.write() .write()
.unwrap() .unwrap()
.get(0) .get(slot)
.unwrap() .unwrap()
.process_transaction(tx)?; .process_transaction(tx)?;
subscriptions.notify_subscribers(0, &bank_forks); subscriptions.notify_subscribers(slot);
Ok(()) Ok(())
} }
@ -387,6 +385,7 @@ mod tests {
let rpc = RpcSolPubSubImpl { let rpc = RpcSolPubSubImpl {
subscriptions: Arc::new(RpcSubscriptions::new( subscriptions: Arc::new(RpcSubscriptions::new(
&Arc::new(AtomicBool::new(false)), &Arc::new(AtomicBool::new(false)),
bank_forks.clone(),
Arc::new(RwLock::new( Arc::new(RwLock::new(
BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), BlockCommitmentCache::new_for_tests_with_blockstore(blockstore),
)), )),
@ -401,7 +400,7 @@ mod tests {
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("signatureNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("signatureNotification");
rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string(), None); rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string(), None);
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions, 0).unwrap();
// Test signature confirmation notification // Test signature confirmation notification
let (response, _) = robust_poll_or_panic(receiver); let (response, _) = robust_poll_or_panic(receiver);
@ -430,15 +429,15 @@ mod tests {
} = create_genesis_config(10_000); } = create_genesis_config(10_000);
let bob_pubkey = Pubkey::new_rand(); let bob_pubkey = Pubkey::new_rand();
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let arc_bank = Arc::new(bank); let blockhash = bank.last_blockhash();
let blockhash = arc_bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let session = create_session(); let session = create_session();
let mut io = PubSubHandler::default(); let mut io = PubSubHandler::default();
let rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore); let rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks);
io.extend_with(rpc.to_delegate()); io.extend_with(rpc.to_delegate());
let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash); let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash);
@ -493,14 +492,22 @@ mod tests {
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let rpc = RpcSolPubSubImpl { let rpc = RpcSolPubSubImpl {
subscriptions: Arc::new(RpcSubscriptions::new( subscriptions: Arc::new(RpcSubscriptions::new(
&Arc::new(AtomicBool::new(false)), &Arc::new(AtomicBool::new(false)),
bank_forks.clone(),
Arc::new(RwLock::new( Arc::new(RwLock::new(
BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), BlockCommitmentCache::new_for_tests_with_blockstore_bank(
blockstore,
bank_forks.read().unwrap().get(1).unwrap().clone(),
1,
),
)), )),
)), )),
uid: Arc::new(atomic::AtomicUsize::default()), uid: Arc::new(atomic::AtomicUsize::default()),
@ -515,7 +522,7 @@ mod tests {
); );
let tx = system_transaction::transfer(&alice, &contract_funds.pubkey(), 51, blockhash); let tx = system_transaction::transfer(&alice, &contract_funds.pubkey(), 51, blockhash);
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions, 1).unwrap();
let ixs = budget_instruction::when_signed( let ixs = budget_instruction::when_signed(
&contract_funds.pubkey(), &contract_funds.pubkey(),
@ -530,14 +537,14 @@ mod tests {
&ixs, &ixs,
blockhash, blockhash,
); );
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions, 1).unwrap();
sleep(Duration::from_millis(200)); sleep(Duration::from_millis(200));
// Test signature confirmation notification #1 // Test signature confirmation notification #1
let expected_data = bank_forks let expected_data = bank_forks
.read() .read()
.unwrap() .unwrap()
.get(0) .get(1)
.unwrap() .unwrap()
.get_account(&contract_state.pubkey()) .get_account(&contract_state.pubkey())
.unwrap() .unwrap()
@ -547,7 +554,7 @@ mod tests {
"method": "accountNotification", "method": "accountNotification",
"params": { "params": {
"result": { "result": {
"context": { "slot": 0 }, "context": { "slot": 1 },
"value": { "value": {
"owner": budget_program_id.to_string(), "owner": budget_program_id.to_string(),
"lamports": 51, "lamports": 51,
@ -564,7 +571,7 @@ mod tests {
assert_eq!(serde_json::to_string(&expected).unwrap(), response); assert_eq!(serde_json::to_string(&expected).unwrap(), response);
let tx = system_transaction::transfer(&alice, &witness.pubkey(), 1, blockhash); let tx = system_transaction::transfer(&alice, &witness.pubkey(), 1, blockhash);
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions, 1).unwrap();
sleep(Duration::from_millis(200)); sleep(Duration::from_millis(200));
let ix = budget_instruction::apply_signature( let ix = budget_instruction::apply_signature(
&witness.pubkey(), &witness.pubkey(),
@ -572,14 +579,14 @@ mod tests {
&bob_pubkey, &bob_pubkey,
); );
let tx = Transaction::new_signed_instructions(&[&witness], &[ix], blockhash); let tx = Transaction::new_signed_instructions(&[&witness], &[ix], blockhash);
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions, 1).unwrap();
sleep(Duration::from_millis(200)); sleep(Duration::from_millis(200));
assert_eq!( assert_eq!(
bank_forks bank_forks
.read() .read()
.unwrap() .unwrap()
.get(0) .get(1)
.unwrap() .unwrap()
.get_account(&contract_state.pubkey()), .get_account(&contract_state.pubkey()),
None None
@ -593,9 +600,12 @@ mod tests {
let session = create_session(); let session = create_session();
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, Bank::new(&genesis_config))));
let mut io = PubSubHandler::default(); let mut io = PubSubHandler::default();
let rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore); let rpc =
RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks.clone());
io.extend_with(rpc.to_delegate()); io.extend_with(rpc.to_delegate());
@ -630,7 +640,7 @@ mod tests {
#[test] #[test]
#[should_panic] #[should_panic]
fn test_account_confirmations_not_fulfilled() { fn test_account_commitment_not_fulfilled() {
let GenesisConfigInfo { let GenesisConfigInfo {
genesis_config, genesis_config,
mint_keypair: alice, mint_keypair: alice,
@ -638,15 +648,19 @@ mod tests {
} = create_genesis_config(10_000); } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(1, bank)));
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let bob = Keypair::new(); let bob = Keypair::new();
let mut rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore.clone()); let mut rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks(
blockstore.clone(),
bank_forks.clone(),
);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new( let subscriptions = RpcSubscriptions::new(
&exit, &exit,
bank_forks.clone(),
Arc::new(RwLock::new( Arc::new(RwLock::new(
BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), BlockCommitmentCache::new_for_tests_with_blockstore(blockstore),
)), )),
@ -654,24 +668,29 @@ mod tests {
rpc.subscriptions = Arc::new(subscriptions); rpc.subscriptions = Arc::new(subscriptions);
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification");
rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); rpc.account_subscribe(
session,
subscriber,
bob.pubkey().to_string(),
Some(CommitmentConfig::root()),
);
let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash); let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash);
bank_forks bank_forks
.write() .write()
.unwrap() .unwrap()
.get(0) .get(1)
.unwrap() .unwrap()
.process_transaction(&tx) .process_transaction(&tx)
.unwrap(); .unwrap();
rpc.subscriptions.notify_subscribers(0, &bank_forks); rpc.subscriptions.notify_subscribers(0);
// allow 200ms for notification thread to wake // allow 200ms for notification thread to wake
std::thread::sleep(Duration::from_millis(200)); std::thread::sleep(Duration::from_millis(200));
let _panic = robust_poll_or_panic(receiver); let _panic = robust_poll_or_panic(receiver);
} }
#[test] #[test]
fn test_account_confirmations() { fn test_account_commitment() {
let GenesisConfigInfo { let GenesisConfigInfo {
genesis_config, genesis_config,
mint_keypair: alice, mint_keypair: alice,
@ -680,75 +699,65 @@ mod tests {
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let bob = Keypair::new(); let bob = Keypair::new();
let mut rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore.clone()); let mut rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks(
blockstore.clone(),
bank_forks.clone(),
);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let block_commitment_cache = Arc::new(RwLock::new( let block_commitment_cache = Arc::new(RwLock::new(
BlockCommitmentCache::new_for_tests_with_blockstore(blockstore.clone()), BlockCommitmentCache::new_for_tests_with_blockstore(blockstore.clone()),
)); ));
let subscriptions = RpcSubscriptions::new(&exit, block_commitment_cache.clone()); let subscriptions =
RpcSubscriptions::new(&exit, bank_forks.clone(), block_commitment_cache.clone());
rpc.subscriptions = Arc::new(subscriptions); rpc.subscriptions = Arc::new(subscriptions);
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification");
rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); rpc.account_subscribe(
session,
subscriber,
bob.pubkey().to_string(),
Some(CommitmentConfig::root()),
);
let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash); let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash);
bank_forks bank_forks
.write() .write()
.unwrap() .unwrap()
.get(0) .get(1)
.unwrap() .unwrap()
.process_transaction(&tx) .process_transaction(&tx)
.unwrap(); .unwrap();
rpc.subscriptions.notify_subscribers(0, &bank_forks); rpc.subscriptions.notify_subscribers(1);
let bank0 = bank_forks.read().unwrap()[0].clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let bank1 = bank_forks.read().unwrap()[1].clone(); let bank1 = bank_forks.read().unwrap()[1].clone();
let mut cache0 = BlockCommitment::default();
cache0.increase_confirmation_stake(1, 10);
let mut block_commitment = HashMap::new();
block_commitment.entry(0).or_insert(cache0.clone());
let mut new_block_commitment = BlockCommitmentCache::new(
block_commitment,
0,
10,
bank1.clone(),
blockstore.clone(),
0,
);
let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();
std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment);
drop(w_block_commitment_cache);
rpc.subscriptions.notify_subscribers(1, &bank_forks);
let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2);
bank_forks.write().unwrap().insert(bank2); bank_forks.write().unwrap().insert(bank2);
bank_forks.write().unwrap().set_root(1, &None, None);
let bank2 = bank_forks.read().unwrap()[2].clone(); let bank2 = bank_forks.read().unwrap()[2].clone();
let mut cache0 = BlockCommitment::default(); let mut block_commitment: HashMap<Slot, BlockCommitment> = HashMap::new();
cache0.increase_confirmation_stake(2, 10); block_commitment.insert(0, BlockCommitment::default());
let mut block_commitment = HashMap::new();
block_commitment.entry(0).or_insert(cache0.clone());
let mut new_block_commitment = let mut new_block_commitment =
BlockCommitmentCache::new(block_commitment, 0, 10, bank2, blockstore.clone(), 0); BlockCommitmentCache::new(block_commitment, 1, 10, bank2, blockstore.clone(), 1);
let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();
std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment);
drop(w_block_commitment_cache); drop(w_block_commitment_cache);
rpc.subscriptions.notify_subscribers(2, &bank_forks); rpc.subscriptions.notify_subscribers(2);
let expected = json!({ let expected = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "accountNotification", "method": "accountNotification",
"params": { "params": {
"result": { "result": {
"context": { "slot": 0 }, "context": { "slot": 1 },
"value": { "value": {
"owner": system_program::id().to_string(), "owner": system_program::id().to_string(),
"lamports": 100, "lamports": 100,
@ -769,7 +778,10 @@ mod tests {
fn test_slot_subscribe() { fn test_slot_subscribe() {
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks);
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification");
rpc.slot_subscribe(session, subscriber); rpc.slot_subscribe(session, subscriber);
@ -796,7 +808,10 @@ mod tests {
fn test_slot_unsubscribe() { fn test_slot_unsubscribe() {
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let rpc = RpcSolPubSubImpl::default_with_blockstore(blockstore); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let rpc = RpcSolPubSubImpl::default_with_blockstore_bank_forks(blockstore, bank_forks);
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification");
rpc.slot_subscribe(session, subscriber); rpc.slot_subscribe(session, subscriber);

View File

@ -73,7 +73,13 @@ impl PubSubService {
mod tests { mod tests {
use super::*; use super::*;
use crate::commitment::BlockCommitmentCache; use crate::commitment::BlockCommitmentCache;
use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; use solana_ledger::{
bank_forks::BankForks,
blockstore::Blockstore,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
};
use solana_runtime::bank::Bank;
use std::{ use std::{
net::{IpAddr, Ipv4Addr}, net::{IpAddr, Ipv4Addr},
sync::RwLock, sync::RwLock,
@ -85,8 +91,12 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let subscriptions = Arc::new(RpcSubscriptions::new( let subscriptions = Arc::new(RpcSubscriptions::new(
&exit, &exit,
bank_forks,
Arc::new(RwLock::new( Arc::new(RwLock::new(
BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), BlockCommitmentCache::new_for_tests_with_blockstore(blockstore),
)), )),

View File

@ -14,9 +14,13 @@ use solana_client::rpc_response::{
use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore}; use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore};
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::{ use solana_sdk::{
account::Account, clock::Slot, pubkey::Pubkey, signature::Signature, transaction, account::Account,
clock::Slot,
commitment_config::{CommitmentConfig, CommitmentLevel},
pubkey::Pubkey,
signature::Signature,
transaction,
}; };
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
use std::sync::{ use std::sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
mpsc::{Receiver, RecvTimeoutError, SendError, Sender}, mpsc::{Receiver, RecvTimeoutError, SendError, Sender},
@ -24,7 +28,6 @@ use std::sync::{
use std::thread::{Builder, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use std::{ use std::{
cmp::min,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
iter, iter,
sync::{Arc, Mutex, RwLock}, sync::{Arc, Mutex, RwLock},
@ -33,8 +36,6 @@ use tokio::runtime::{Builder as RuntimeBuilder, Runtime, TaskExecutor};
const RECEIVE_DELAY_MILLIS: u64 = 100; const RECEIVE_DELAY_MILLIS: u64 = 100;
pub type Confirmations = usize;
#[derive(Serialize, Deserialize, Clone, Copy, Debug)] #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
pub struct SlotInfo { pub struct SlotInfo {
pub slot: Slot, pub slot: Slot,
@ -45,7 +46,7 @@ pub struct SlotInfo {
enum NotificationEntry { enum NotificationEntry {
Slot(SlotInfo), Slot(SlotInfo),
Root(Slot), Root(Slot),
Bank((Slot, Arc<RwLock<BankForks>>)), Bank(Slot),
} }
impl std::fmt::Debug for NotificationEntry { impl std::fmt::Debug for NotificationEntry {
@ -53,51 +54,57 @@ impl std::fmt::Debug for NotificationEntry {
match self { match self {
NotificationEntry::Root(root) => write!(f, "Root({})", root), NotificationEntry::Root(root) => write!(f, "Root({})", root),
NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info),
NotificationEntry::Bank((current_slot, _)) => { NotificationEntry::Bank(current_slot) => {
write!(f, "Bank({{current_slot: {:?}}})", current_slot) write!(f, "Bank({{current_slot: {:?}}})", current_slot)
} }
} }
} }
} }
struct SubscriptionData<S> {
sink: Sink<S>,
commitment: CommitmentConfig,
last_notified_slot: RwLock<Slot>,
}
type RpcAccountSubscriptions = type RpcAccountSubscriptions =
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<Response<RpcAccount>>, Confirmations)>>>; RwLock<HashMap<Pubkey, HashMap<SubscriptionId, SubscriptionData<Response<RpcAccount>>>>>;
type RpcProgramSubscriptions = RwLock< type RpcProgramSubscriptions =
HashMap<Pubkey, HashMap<SubscriptionId, (Sink<Response<RpcKeyedAccount>>, Confirmations)>>, RwLock<HashMap<Pubkey, HashMap<SubscriptionId, SubscriptionData<Response<RpcKeyedAccount>>>>>;
>;
type RpcSignatureSubscriptions = RwLock< type RpcSignatureSubscriptions = RwLock<
HashMap< HashMap<Signature, HashMap<SubscriptionId, SubscriptionData<Response<RpcSignatureResult>>>>,
Signature,
HashMap<SubscriptionId, (Sink<Response<RpcSignatureResult>>, Confirmations)>,
>,
>; >;
type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>; type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
type RpcRootSubscriptions = RwLock<HashMap<SubscriptionId, Sink<Slot>>>; type RpcRootSubscriptions = RwLock<HashMap<SubscriptionId, Sink<Slot>>>;
fn add_subscription<K, S>( fn add_subscription<K, S>(
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, (Sink<S>, Confirmations)>>, subscriptions: &mut HashMap<K, HashMap<SubscriptionId, SubscriptionData<S>>>,
hashmap_key: K, hashmap_key: K,
confirmations: Option<Confirmations>, commitment: Option<CommitmentConfig>,
sub_id: SubscriptionId, sub_id: SubscriptionId,
subscriber: Subscriber<S>, subscriber: Subscriber<S>,
last_notified_slot: Slot,
) where ) where
K: Eq + Hash, K: Eq + Hash,
S: Clone, S: Clone,
{ {
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let confirmations = confirmations.unwrap_or(0); let commitment = commitment.unwrap_or_else(CommitmentConfig::recent);
let confirmations = min(confirmations, MAX_LOCKOUT_HISTORY + 1); let subscription_data = SubscriptionData {
sink,
commitment,
last_notified_slot: RwLock::new(last_notified_slot),
};
if let Some(current_hashmap) = subscriptions.get_mut(&hashmap_key) { if let Some(current_hashmap) = subscriptions.get_mut(&hashmap_key) {
current_hashmap.insert(sub_id, (sink, confirmations)); current_hashmap.insert(sub_id, subscription_data);
return; return;
} }
let mut hashmap = HashMap::new(); let mut hashmap = HashMap::new();
hashmap.insert(sub_id, (sink, confirmations)); hashmap.insert(sub_id, subscription_data);
subscriptions.insert(hashmap_key, hashmap); subscriptions.insert(hashmap_key, hashmap);
} }
fn remove_subscription<K, S>( fn remove_subscription<K, S>(
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, (Sink<S>, Confirmations)>>, subscriptions: &mut HashMap<K, HashMap<SubscriptionId, SubscriptionData<S>>>,
sub_id: &SubscriptionId, sub_id: &SubscriptionId,
) -> bool ) -> bool
where where
@ -119,8 +126,8 @@ where
} }
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
fn check_confirmations_and_notify<K, S, B, F, X>( fn check_commitment_and_notify<K, S, B, F, X>(
subscriptions: &HashMap<K, HashMap<SubscriptionId, (Sink<Response<S>>, Confirmations)>>, subscriptions: &HashMap<K, HashMap<SubscriptionId, SubscriptionData<Response<S>>>>,
hashmap_key: &K, hashmap_key: &K,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
@ -132,53 +139,50 @@ where
K: Eq + Hash + Clone + Copy, K: Eq + Hash + Clone + Copy,
S: Clone + Serialize, S: Clone + Serialize,
B: Fn(&Bank, &K) -> X, B: Fn(&Bank, &K) -> X,
F: Fn(X, u64) -> Box<dyn Iterator<Item = S>>, F: Fn(X, Slot) -> (Box<dyn Iterator<Item = S>>, Slot),
X: Clone + Serialize, X: Clone + Serialize + Default,
{ {
let mut confirmation_slots: HashMap<usize, Slot> = HashMap::new();
let r_block_commitment_cache = block_commitment_cache.read().unwrap(); let r_block_commitment_cache = block_commitment_cache.read().unwrap();
let current_slot = r_block_commitment_cache.slot(); let current_slot = r_block_commitment_cache.slot();
let root = r_block_commitment_cache.root(); let node_root = r_block_commitment_cache.root();
let current_ancestors = bank_forks let largest_confirmed_root = r_block_commitment_cache.largest_confirmed_root();
.read()
.unwrap()
.get(current_slot)
.unwrap()
.ancestors
.clone();
for (slot, _) in current_ancestors.iter() {
if let Some(confirmations) = r_block_commitment_cache.get_confirmation_count(*slot) {
confirmation_slots.entry(confirmations).or_insert(*slot);
}
}
drop(r_block_commitment_cache); drop(r_block_commitment_cache);
let mut notified_set: HashSet<SubscriptionId> = HashSet::new(); let mut notified_set: HashSet<SubscriptionId> = HashSet::new();
if let Some(hashmap) = subscriptions.get(hashmap_key) { if let Some(hashmap) = subscriptions.get(hashmap_key) {
for (sub_id, (sink, confirmations)) in hashmap.iter() { for (
let desired_slot = if *confirmations == 0 { sub_id,
Some(&current_slot) SubscriptionData {
} else if *confirmations == MAX_LOCKOUT_HISTORY + 1 { sink,
Some(&root) commitment,
} else { last_notified_slot,
confirmation_slots.get(confirmations) },
) in hashmap.iter()
{
let slot = match commitment.commitment {
CommitmentLevel::Max => largest_confirmed_root,
CommitmentLevel::Recent => current_slot,
CommitmentLevel::Root => node_root,
}; };
if let Some(&slot) = desired_slot { let results = {
let results = { let bank_forks = bank_forks.read().unwrap();
let bank_forks = bank_forks.read().unwrap(); bank_forks
let desired_bank = bank_forks.get(slot).unwrap(); .get(slot)
bank_method(&desired_bank, hashmap_key) .map(|desired_bank| bank_method(&desired_bank, hashmap_key))
}; .unwrap_or_default()
for result in filter_results(results, root) { };
notifier.notify( let mut w_last_notified_slot = last_notified_slot.write().unwrap();
Response { let (filter_results, result_slot) = filter_results(results, *w_last_notified_slot);
context: RpcResponseContext { slot }, for result in filter_results {
value: result, notifier.notify(
}, Response {
sink, context: RpcResponseContext { slot },
); value: result,
notified_set.insert(sub_id.clone()); },
} sink,
);
notified_set.insert(sub_id.clone());
*w_last_notified_slot = result_slot;
} }
} }
} }
@ -199,50 +203,63 @@ impl RpcNotifier {
fn filter_account_result( fn filter_account_result(
result: Option<(Account, Slot)>, result: Option<(Account, Slot)>,
root: Slot, last_notified_slot: Slot,
) -> Box<dyn Iterator<Item = RpcAccount>> { ) -> (Box<dyn Iterator<Item = RpcAccount>>, Slot) {
if let Some((account, fork)) = result { if let Some((account, fork)) = result {
if fork >= root { if fork != last_notified_slot {
return Box::new(iter::once(RpcAccount::encode(account))); return (Box::new(iter::once(RpcAccount::encode(account))), fork);
} }
} }
Box::new(iter::empty()) (Box::new(iter::empty()), last_notified_slot)
} }
fn filter_signature_result( fn filter_signature_result(
result: Option<transaction::Result<()>>, result: Option<transaction::Result<()>>,
_root: Slot, last_notified_slot: Slot,
) -> Box<dyn Iterator<Item = RpcSignatureResult>> { ) -> (Box<dyn Iterator<Item = RpcSignatureResult>>, Slot) {
Box::new( (
result Box::new(
.into_iter() result
.map(|result| RpcSignatureResult { err: result.err() }), .into_iter()
.map(|result| RpcSignatureResult { err: result.err() }),
),
last_notified_slot,
) )
} }
fn filter_program_results( fn filter_program_results(
accounts: Vec<(Pubkey, Account)>, accounts: Vec<(Pubkey, Account)>,
_root: Slot, last_notified_slot: Slot,
) -> Box<dyn Iterator<Item = RpcKeyedAccount>> { ) -> (Box<dyn Iterator<Item = RpcKeyedAccount>>, Slot) {
Box::new( (
accounts Box::new(
.into_iter() accounts
.map(|(pubkey, account)| RpcKeyedAccount { .into_iter()
pubkey: pubkey.to_string(), .map(|(pubkey, account)| RpcKeyedAccount {
account: RpcAccount::encode(account), pubkey: pubkey.to_string(),
}), account: RpcAccount::encode(account),
}),
),
last_notified_slot,
) )
} }
pub struct RpcSubscriptions { #[derive(Clone)]
struct Subscriptions {
account_subscriptions: Arc<RpcAccountSubscriptions>, account_subscriptions: Arc<RpcAccountSubscriptions>,
program_subscriptions: Arc<RpcProgramSubscriptions>, program_subscriptions: Arc<RpcProgramSubscriptions>,
signature_subscriptions: Arc<RpcSignatureSubscriptions>, signature_subscriptions: Arc<RpcSignatureSubscriptions>,
slot_subscriptions: Arc<RpcSlotSubscriptions>, slot_subscriptions: Arc<RpcSlotSubscriptions>,
root_subscriptions: Arc<RpcRootSubscriptions>, root_subscriptions: Arc<RpcRootSubscriptions>,
}
pub struct RpcSubscriptions {
subscriptions: Subscriptions,
notification_sender: Arc<Mutex<Sender<NotificationEntry>>>, notification_sender: Arc<Mutex<Sender<NotificationEntry>>>,
t_cleanup: Option<JoinHandle<()>>, t_cleanup: Option<JoinHandle<()>>,
notifier_runtime: Option<Runtime>, notifier_runtime: Option<Runtime>,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
} }
@ -257,6 +274,7 @@ impl Drop for RpcSubscriptions {
impl RpcSubscriptions { impl RpcSubscriptions {
pub fn new( pub fn new(
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
) -> Self { ) -> Self {
let (notification_sender, notification_receiver): ( let (notification_sender, notification_receiver): (
@ -271,12 +289,17 @@ impl RpcSubscriptions {
let root_subscriptions = Arc::new(RpcRootSubscriptions::default()); let root_subscriptions = Arc::new(RpcRootSubscriptions::default());
let notification_sender = Arc::new(Mutex::new(notification_sender)); let notification_sender = Arc::new(Mutex::new(notification_sender));
let _bank_forks = bank_forks.clone();
let _block_commitment_cache = block_commitment_cache.clone();
let exit_clone = exit.clone(); let exit_clone = exit.clone();
let account_subscriptions_clone = account_subscriptions.clone(); let subscriptions = Subscriptions {
let program_subscriptions_clone = program_subscriptions.clone(); account_subscriptions,
let signature_subscriptions_clone = signature_subscriptions.clone(); program_subscriptions,
let slot_subscriptions_clone = slot_subscriptions.clone(); signature_subscriptions,
let root_subscriptions_clone = root_subscriptions.clone(); slot_subscriptions,
root_subscriptions,
};
let _subscriptions = subscriptions.clone();
let notifier_runtime = RuntimeBuilder::new() let notifier_runtime = RuntimeBuilder::new()
.core_threads(1) .core_threads(1)
@ -292,32 +315,31 @@ impl RpcSubscriptions {
exit_clone, exit_clone,
notifier, notifier,
notification_receiver, notification_receiver,
account_subscriptions_clone, _subscriptions,
program_subscriptions_clone, _bank_forks,
signature_subscriptions_clone, _block_commitment_cache,
slot_subscriptions_clone,
root_subscriptions_clone,
block_commitment_cache,
); );
}) })
.unwrap(); .unwrap();
Self { Self {
account_subscriptions, subscriptions,
program_subscriptions,
signature_subscriptions,
slot_subscriptions,
root_subscriptions,
notification_sender, notification_sender,
notifier_runtime: Some(notifier_runtime), notifier_runtime: Some(notifier_runtime),
t_cleanup: Some(t_cleanup), t_cleanup: Some(t_cleanup),
bank_forks,
block_commitment_cache,
exit: exit.clone(), exit: exit.clone(),
} }
} }
pub fn default_with_blockstore(blockstore: Arc<Blockstore>) -> Self { pub fn default_with_blockstore_bank_forks(
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
Self::new( Self::new(
&Arc::new(AtomicBool::new(false)), &Arc::new(AtomicBool::new(false)),
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore( Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore(
blockstore, blockstore,
))), ))),
@ -332,12 +354,12 @@ impl RpcSubscriptions {
notifier: &RpcNotifier, notifier: &RpcNotifier,
) { ) {
let subscriptions = account_subscriptions.read().unwrap(); let subscriptions = account_subscriptions.read().unwrap();
check_confirmations_and_notify( check_commitment_and_notify(
&subscriptions, &subscriptions,
pubkey, pubkey,
bank_forks, bank_forks,
block_commitment_cache, block_commitment_cache,
Bank::get_account_modified_since_parent, Bank::get_account_modified_slot,
filter_account_result, filter_account_result,
notifier, notifier,
); );
@ -351,7 +373,7 @@ impl RpcSubscriptions {
notifier: &RpcNotifier, notifier: &RpcNotifier,
) { ) {
let subscriptions = program_subscriptions.read().unwrap(); let subscriptions = program_subscriptions.read().unwrap();
check_confirmations_and_notify( check_commitment_and_notify(
&subscriptions, &subscriptions,
program_id, program_id,
bank_forks, bank_forks,
@ -370,7 +392,7 @@ impl RpcSubscriptions {
notifier: &RpcNotifier, notifier: &RpcNotifier,
) { ) {
let mut subscriptions = signature_subscriptions.write().unwrap(); let mut subscriptions = signature_subscriptions.write().unwrap();
let notified_ids = check_confirmations_and_notify( let notified_ids = check_commitment_and_notify(
&subscriptions, &subscriptions,
signature, signature,
bank_forks, bank_forks,
@ -390,83 +412,109 @@ impl RpcSubscriptions {
pub fn add_account_subscription( pub fn add_account_subscription(
&self, &self,
pubkey: Pubkey, pubkey: Pubkey,
confirmations: Option<Confirmations>, commitment: Option<CommitmentConfig>,
sub_id: SubscriptionId, sub_id: SubscriptionId,
subscriber: Subscriber<Response<RpcAccount>>, subscriber: Subscriber<Response<RpcAccount>>,
) { ) {
let mut subscriptions = self.account_subscriptions.write().unwrap(); let mut subscriptions = self.subscriptions.account_subscriptions.write().unwrap();
let slot = match commitment
.unwrap_or_else(CommitmentConfig::recent)
.commitment
{
CommitmentLevel::Max => self
.block_commitment_cache
.read()
.unwrap()
.largest_confirmed_root(),
CommitmentLevel::Recent => self.block_commitment_cache.read().unwrap().slot(),
CommitmentLevel::Root => self.block_commitment_cache.read().unwrap().root(),
};
let last_notified_slot = if let Some((_account, slot)) = self
.bank_forks
.read()
.unwrap()
.get(slot)
.and_then(|bank| bank.get_account_modified_slot(&pubkey))
{
slot
} else {
0
};
add_subscription( add_subscription(
&mut subscriptions, &mut subscriptions,
pubkey, pubkey,
confirmations, commitment,
sub_id, sub_id,
subscriber, subscriber,
last_notified_slot,
); );
} }
pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool { pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.account_subscriptions.write().unwrap(); let mut subscriptions = self.subscriptions.account_subscriptions.write().unwrap();
remove_subscription(&mut subscriptions, id) remove_subscription(&mut subscriptions, id)
} }
pub fn add_program_subscription( pub fn add_program_subscription(
&self, &self,
program_id: Pubkey, program_id: Pubkey,
confirmations: Option<Confirmations>, commitment: Option<CommitmentConfig>,
sub_id: SubscriptionId, sub_id: SubscriptionId,
subscriber: Subscriber<Response<RpcKeyedAccount>>, subscriber: Subscriber<Response<RpcKeyedAccount>>,
) { ) {
let mut subscriptions = self.program_subscriptions.write().unwrap(); let mut subscriptions = self.subscriptions.program_subscriptions.write().unwrap();
add_subscription( add_subscription(
&mut subscriptions, &mut subscriptions,
program_id, program_id,
confirmations, commitment,
sub_id, sub_id,
subscriber, subscriber,
0, // last_notified_slot is not utilized for program subscriptions
); );
} }
pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool { pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.program_subscriptions.write().unwrap(); let mut subscriptions = self.subscriptions.program_subscriptions.write().unwrap();
remove_subscription(&mut subscriptions, id) remove_subscription(&mut subscriptions, id)
} }
pub fn add_signature_subscription( pub fn add_signature_subscription(
&self, &self,
signature: Signature, signature: Signature,
confirmations: Option<Confirmations>, commitment: Option<CommitmentConfig>,
sub_id: SubscriptionId, sub_id: SubscriptionId,
subscriber: Subscriber<Response<RpcSignatureResult>>, subscriber: Subscriber<Response<RpcSignatureResult>>,
) { ) {
let mut subscriptions = self.signature_subscriptions.write().unwrap(); let mut subscriptions = self.subscriptions.signature_subscriptions.write().unwrap();
add_subscription( add_subscription(
&mut subscriptions, &mut subscriptions,
signature, signature,
confirmations, commitment,
sub_id, sub_id,
subscriber, subscriber,
0, // last_notified_slot is not utilized for signature subscriptions
); );
} }
pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool { pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.signature_subscriptions.write().unwrap(); let mut subscriptions = self.subscriptions.signature_subscriptions.write().unwrap();
remove_subscription(&mut subscriptions, id) remove_subscription(&mut subscriptions, id)
} }
/// Notify subscribers of changes to any accounts or new signatures since /// Notify subscribers of changes to any accounts or new signatures since
/// the bank's last checkpoint. /// the bank's last checkpoint.
pub fn notify_subscribers(&self, current_slot: Slot, bank_forks: &Arc<RwLock<BankForks>>) { pub fn notify_subscribers(&self, current_slot: Slot) {
self.enqueue_notification(NotificationEntry::Bank((current_slot, bank_forks.clone()))); self.enqueue_notification(NotificationEntry::Bank(current_slot));
} }
pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<SlotInfo>) { pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<SlotInfo>) {
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let mut subscriptions = self.slot_subscriptions.write().unwrap(); let mut subscriptions = self.subscriptions.slot_subscriptions.write().unwrap();
subscriptions.insert(sub_id, sink); subscriptions.insert(sub_id, sink);
} }
pub fn remove_slot_subscription(&self, id: &SubscriptionId) -> bool { pub fn remove_slot_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.slot_subscriptions.write().unwrap(); let mut subscriptions = self.subscriptions.slot_subscriptions.write().unwrap();
subscriptions.remove(id).is_some() subscriptions.remove(id).is_some()
} }
@ -476,12 +524,12 @@ impl RpcSubscriptions {
pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<Slot>) { pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<Slot>) {
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let mut subscriptions = self.root_subscriptions.write().unwrap(); let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap();
subscriptions.insert(sub_id, sink); subscriptions.insert(sub_id, sink);
} }
pub fn remove_root_subscription(&self, id: &SubscriptionId) -> bool { pub fn remove_root_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.root_subscriptions.write().unwrap(); let mut subscriptions = self.subscriptions.root_subscriptions.write().unwrap();
subscriptions.remove(id).is_some() subscriptions.remove(id).is_some()
} }
@ -513,11 +561,8 @@ impl RpcSubscriptions {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
notifier: RpcNotifier, notifier: RpcNotifier,
notification_receiver: Receiver<NotificationEntry>, notification_receiver: Receiver<NotificationEntry>,
account_subscriptions: Arc<RpcAccountSubscriptions>, subscriptions: Subscriptions,
program_subscriptions: Arc<RpcProgramSubscriptions>, bank_forks: Arc<RwLock<BankForks>>,
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
slot_subscriptions: Arc<RpcSlotSubscriptions>,
root_subscriptions: Arc<RpcRootSubscriptions>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
) { ) {
loop { loop {
@ -527,20 +572,20 @@ impl RpcSubscriptions {
match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) { match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) {
Ok(notification_entry) => match notification_entry { Ok(notification_entry) => match notification_entry {
NotificationEntry::Slot(slot_info) => { NotificationEntry::Slot(slot_info) => {
let subscriptions = slot_subscriptions.read().unwrap(); let subscriptions = subscriptions.slot_subscriptions.read().unwrap();
for (_, sink) in subscriptions.iter() { for (_, sink) in subscriptions.iter() {
notifier.notify(slot_info, sink); notifier.notify(slot_info, sink);
} }
} }
NotificationEntry::Root(root) => { NotificationEntry::Root(root) => {
let subscriptions = root_subscriptions.read().unwrap(); let subscriptions = subscriptions.root_subscriptions.read().unwrap();
for (_, sink) in subscriptions.iter() { for (_, sink) in subscriptions.iter() {
notifier.notify(root, sink); notifier.notify(root, sink);
} }
} }
NotificationEntry::Bank((_current_slot, bank_forks)) => { NotificationEntry::Bank(_current_slot) => {
let pubkeys: Vec<_> = { let pubkeys: Vec<_> = {
let subs = account_subscriptions.read().unwrap(); let subs = subscriptions.account_subscriptions.read().unwrap();
subs.keys().cloned().collect() subs.keys().cloned().collect()
}; };
for pubkey in &pubkeys { for pubkey in &pubkeys {
@ -548,13 +593,13 @@ impl RpcSubscriptions {
pubkey, pubkey,
&bank_forks, &bank_forks,
&block_commitment_cache, &block_commitment_cache,
account_subscriptions.clone(), subscriptions.account_subscriptions.clone(),
&notifier, &notifier,
); );
} }
let programs: Vec<_> = { let programs: Vec<_> = {
let subs = program_subscriptions.read().unwrap(); let subs = subscriptions.program_subscriptions.read().unwrap();
subs.keys().cloned().collect() subs.keys().cloned().collect()
}; };
for program_id in &programs { for program_id in &programs {
@ -562,13 +607,13 @@ impl RpcSubscriptions {
program_id, program_id,
&bank_forks, &bank_forks,
&block_commitment_cache, &block_commitment_cache,
program_subscriptions.clone(), subscriptions.program_subscriptions.clone(),
&notifier, &notifier,
); );
} }
let signatures: Vec<_> = { let signatures: Vec<_> = {
let subs = signature_subscriptions.read().unwrap(); let subs = subscriptions.signature_subscriptions.read().unwrap();
subs.keys().cloned().collect() subs.keys().cloned().collect()
}; };
for signature in &signatures { for signature in &signatures {
@ -576,7 +621,7 @@ impl RpcSubscriptions {
signature, signature,
&bank_forks, &bank_forks,
&block_commitment_cache, &block_commitment_cache,
signature_subscriptions.clone(), subscriptions.signature_subscriptions.clone(),
&notifier, &notifier,
); );
} }
@ -671,7 +716,35 @@ pub(crate) mod tests {
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let alice = Keypair::new(); let alice = Keypair::new();
let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("accountNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(
&exit,
bank_forks.clone(),
Arc::new(RwLock::new(
BlockCommitmentCache::new_for_tests_with_blockstore_bank(
blockstore,
bank_forks.read().unwrap().get(1).unwrap().clone(),
1,
),
)),
);
subscriptions.add_account_subscription(alice.pubkey(), None, sub_id.clone(), subscriber);
assert!(subscriptions
.subscriptions
.account_subscriptions
.read()
.unwrap()
.contains_key(&alice.pubkey()));
let tx = system_transaction::create_account( let tx = system_transaction::create_account(
&mint_keypair, &mint_keypair,
&alice, &alice,
@ -683,37 +756,18 @@ pub(crate) mod tests {
bank_forks bank_forks
.write() .write()
.unwrap() .unwrap()
.get(0) .get(1)
.unwrap() .unwrap()
.process_transaction(&tx) .process_transaction(&tx)
.unwrap(); .unwrap();
subscriptions.notify_subscribers(1);
let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("accountNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(
BlockCommitmentCache::new_for_tests_with_blockstore(blockstore),
)),
);
subscriptions.add_account_subscription(alice.pubkey(), None, sub_id.clone(), subscriber);
assert!(subscriptions
.account_subscriptions
.read()
.unwrap()
.contains_key(&alice.pubkey()));
subscriptions.notify_subscribers(0, &bank_forks);
let (response, _) = robust_poll_or_panic(transport_receiver); let (response, _) = robust_poll_or_panic(transport_receiver);
let expected = json!({ let expected = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "accountNotification", "method": "accountNotification",
"params": { "params": {
"result": { "result": {
"context": { "slot": 0 }, "context": { "slot": 1 },
"value": { "value": {
"data": "1111111111111111", "data": "1111111111111111",
"executable": false, "executable": false,
@ -729,6 +783,7 @@ pub(crate) mod tests {
subscriptions.remove_account_subscription(&sub_id); subscriptions.remove_account_subscription(&sub_id);
assert!(!subscriptions assert!(!subscriptions
.subscriptions
.account_subscriptions .account_subscriptions
.read() .read()
.unwrap() .unwrap()
@ -771,6 +826,7 @@ pub(crate) mod tests {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new( let subscriptions = RpcSubscriptions::new(
&exit, &exit,
bank_forks,
Arc::new(RwLock::new( Arc::new(RwLock::new(
BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), BlockCommitmentCache::new_for_tests_with_blockstore(blockstore),
)), )),
@ -783,12 +839,13 @@ pub(crate) mod tests {
); );
assert!(subscriptions assert!(subscriptions
.subscriptions
.program_subscriptions .program_subscriptions
.read() .read()
.unwrap() .unwrap()
.contains_key(&solana_budget_program::id())); .contains_key(&solana_budget_program::id()));
subscriptions.notify_subscribers(0, &bank_forks); subscriptions.notify_subscribers(0);
let (response, _) = robust_poll_or_panic(transport_receiver); let (response, _) = robust_poll_or_panic(transport_receiver);
let expected = json!({ let expected = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
@ -814,6 +871,7 @@ pub(crate) mod tests {
subscriptions.remove_program_subscription(&sub_id); subscriptions.remove_program_subscription(&sub_id);
assert!(!subscriptions assert!(!subscriptions
.subscriptions
.program_subscriptions .program_subscriptions
.read() .read()
.unwrap() .unwrap()
@ -872,8 +930,11 @@ pub(crate) mod tests {
BlockCommitmentCache::new(block_commitment, 0, 10, bank1, blockstore, 0); BlockCommitmentCache::new(block_commitment, 0, 10, bank1, blockstore, 0);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = let subscriptions = RpcSubscriptions::new(
RpcSubscriptions::new(&exit, Arc::new(RwLock::new(block_commitment_cache))); &exit,
bank_forks,
Arc::new(RwLock::new(block_commitment_cache)),
);
let (past_bank_sub1, _id_receiver, past_bank_recv1) = let (past_bank_sub1, _id_receiver, past_bank_recv1) =
Subscriber::new_test("signatureNotification"); Subscriber::new_test("signatureNotification");
@ -884,37 +945,41 @@ pub(crate) mod tests {
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
past_bank_tx.signatures[0], past_bank_tx.signatures[0],
Some(0), Some(CommitmentConfig::recent()),
SubscriptionId::Number(1 as u64), SubscriptionId::Number(1 as u64),
past_bank_sub1, past_bank_sub1,
); );
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
past_bank_tx.signatures[0], past_bank_tx.signatures[0],
Some(1), Some(CommitmentConfig::root()),
SubscriptionId::Number(2 as u64), SubscriptionId::Number(2 as u64),
past_bank_sub2, past_bank_sub2,
); );
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
processed_tx.signatures[0], processed_tx.signatures[0],
Some(0), Some(CommitmentConfig::recent()),
SubscriptionId::Number(3 as u64), SubscriptionId::Number(3 as u64),
processed_sub, processed_sub,
); );
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
unprocessed_tx.signatures[0], unprocessed_tx.signatures[0],
Some(0), Some(CommitmentConfig::recent()),
SubscriptionId::Number(4 as u64), SubscriptionId::Number(4 as u64),
Subscriber::new_test("signatureNotification").0, Subscriber::new_test("signatureNotification").0,
); );
{ {
let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); let sig_subs = subscriptions
.subscriptions
.signature_subscriptions
.read()
.unwrap();
assert_eq!(sig_subs.get(&past_bank_tx.signatures[0]).unwrap().len(), 2); assert_eq!(sig_subs.get(&past_bank_tx.signatures[0]).unwrap().len(), 2);
assert!(sig_subs.contains_key(&unprocessed_tx.signatures[0])); assert!(sig_subs.contains_key(&unprocessed_tx.signatures[0]));
assert!(sig_subs.contains_key(&processed_tx.signatures[0])); assert!(sig_subs.contains_key(&processed_tx.signatures[0]));
} }
subscriptions.notify_subscribers(1, &bank_forks); subscriptions.notify_subscribers(1);
let expected_res = RpcSignatureResult { err: None }; let expected_res = RpcSignatureResult { err: None };
struct Notification { struct Notification {
@ -954,7 +1019,11 @@ pub(crate) mod tests {
assert_eq!(expected, response); assert_eq!(expected, response);
// Subscription should be automatically removed after notification // Subscription should be automatically removed after notification
let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); let sig_subs = subscriptions
.subscriptions
.signature_subscriptions
.read()
.unwrap();
assert!(!sig_subs.contains_key(&processed_tx.signatures[0])); assert!(!sig_subs.contains_key(&processed_tx.signatures[0]));
assert!(!sig_subs.contains_key(&past_bank_tx.signatures[0])); assert!(!sig_subs.contains_key(&past_bank_tx.signatures[0]));
@ -974,8 +1043,12 @@ pub(crate) mod tests {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let subscriptions = RpcSubscriptions::new( let subscriptions = RpcSubscriptions::new(
&exit, &exit,
bank_forks,
Arc::new(RwLock::new( Arc::new(RwLock::new(
BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), BlockCommitmentCache::new_for_tests_with_blockstore(blockstore),
)), )),
@ -983,6 +1056,7 @@ pub(crate) mod tests {
subscriptions.add_slot_subscription(sub_id.clone(), subscriber); subscriptions.add_slot_subscription(sub_id.clone(), subscriber);
assert!(subscriptions assert!(subscriptions
.subscriptions
.slot_subscriptions .slot_subscriptions
.read() .read()
.unwrap() .unwrap()
@ -1005,6 +1079,7 @@ pub(crate) mod tests {
subscriptions.remove_slot_subscription(&sub_id); subscriptions.remove_slot_subscription(&sub_id);
assert!(!subscriptions assert!(!subscriptions
.subscriptions
.slot_subscriptions .slot_subscriptions
.read() .read()
.unwrap() .unwrap()
@ -1020,8 +1095,12 @@ pub(crate) mod tests {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let subscriptions = RpcSubscriptions::new( let subscriptions = RpcSubscriptions::new(
&exit, &exit,
bank_forks,
Arc::new(RwLock::new( Arc::new(RwLock::new(
BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), BlockCommitmentCache::new_for_tests_with_blockstore(blockstore),
)), )),
@ -1029,6 +1108,7 @@ pub(crate) mod tests {
subscriptions.add_root_subscription(sub_id.clone(), subscriber); subscriptions.add_root_subscription(sub_id.clone(), subscriber);
assert!(subscriptions assert!(subscriptions
.subscriptions
.root_subscriptions .root_subscriptions
.read() .read()
.unwrap() .unwrap()
@ -1050,6 +1130,7 @@ pub(crate) mod tests {
subscriptions.remove_root_subscription(&sub_id); subscriptions.remove_root_subscription(&sub_id);
assert!(!subscriptions assert!(!subscriptions
.subscriptions
.root_subscriptions .root_subscriptions
.read() .read()
.unwrap() .unwrap()
@ -1059,7 +1140,7 @@ pub(crate) mod tests {
#[test] #[test]
#[serial] #[serial]
fn test_add_and_remove_subscription() { fn test_add_and_remove_subscription() {
let mut subscriptions: HashMap<u64, HashMap<SubscriptionId, (Sink<()>, Confirmations)>> = let mut subscriptions: HashMap<u64, HashMap<SubscriptionId, SubscriptionData<()>>> =
HashMap::new(); HashMap::new();
let num_keys = 5; let num_keys = 5;
@ -1067,7 +1148,7 @@ pub(crate) mod tests {
let (subscriber, _id_receiver, _transport_receiver) = let (subscriber, _id_receiver, _transport_receiver) =
Subscriber::new_test("notification"); Subscriber::new_test("notification");
let sub_id = SubscriptionId::Number(key); let sub_id = SubscriptionId::Number(key);
add_subscription(&mut subscriptions, key, None, sub_id, subscriber); add_subscription(&mut subscriptions, key, None, sub_id, subscriber, 0);
} }
// Add another subscription to the "0" key // Add another subscription to the "0" key
@ -1079,6 +1160,7 @@ pub(crate) mod tests {
None, None,
extra_sub_id.clone(), extra_sub_id.clone(),
subscriber, subscriber,
0,
); );
assert_eq!(subscriptions.len(), num_keys as usize); assert_eq!(subscriptions.len(), num_keys as usize);

View File

@ -304,11 +304,12 @@ pub mod tests {
BlockCommitmentCache::default_with_blockstore(blockstore.clone()), BlockCommitmentCache::default_with_blockstore(blockstore.clone()),
)); ));
let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let tvu = Tvu::new( let tvu = Tvu::new(
&vote_keypair.pubkey(), &vote_keypair.pubkey(),
vec![Arc::new(vote_keypair)], vec![Arc::new(vote_keypair)],
&storage_keypair, &storage_keypair,
&Arc::new(RwLock::new(bank_forks)), &bank_forks,
&cref1, &cref1,
{ {
Sockets { Sockets {
@ -321,7 +322,11 @@ pub mod tests {
blockstore, blockstore,
&StorageState::default(), &StorageState::default(),
l_receiver, l_receiver,
&Arc::new(RpcSubscriptions::new(&exit, block_commitment_cache.clone())), &Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
block_commitment_cache.clone(),
)),
&poh_recorder, &poh_recorder,
&leader_schedule_cache, &leader_schedule_cache,
&exit, &exit,

View File

@ -241,7 +241,11 @@ impl Validator {
BlockCommitmentCache::default_with_blockstore(blockstore.clone()), BlockCommitmentCache::default_with_blockstore(blockstore.clone()),
)); ));
let subscriptions = Arc::new(RpcSubscriptions::new(&exit, block_commitment_cache.clone())); let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
block_commitment_cache.clone(),
));
let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| { let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| {
if ContactInfo::is_valid_address(&node.info.rpc) { if ContactInfo::is_valid_address(&node.info.rpc) {

View File

@ -6,7 +6,13 @@ use solana_core::{
commitment::BlockCommitmentCache, rpc_pubsub_service::PubSubService, commitment::BlockCommitmentCache, rpc_pubsub_service::PubSubService,
rpc_subscriptions::RpcSubscriptions, validator::TestValidator, rpc_subscriptions::RpcSubscriptions, validator::TestValidator,
}; };
use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; use solana_ledger::{
bank_forks::BankForks,
blockstore::Blockstore,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
};
use solana_runtime::bank::Bank;
use solana_sdk::{ use solana_sdk::{
commitment_config::CommitmentConfig, pubkey::Pubkey, rpc_port, signature::Signer, commitment_config::CommitmentConfig, pubkey::Pubkey, rpc_port, signature::Signer,
system_transaction, system_transaction,
@ -88,8 +94,12 @@ fn test_slot_subscription() {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let subscriptions = Arc::new(RpcSubscriptions::new( let subscriptions = Arc::new(RpcSubscriptions::new(
&exit, &exit,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore( Arc::new(RwLock::new(BlockCommitmentCache::default_with_blockstore(
blockstore, blockstore,
))), ))),

View File

@ -9,7 +9,7 @@ use reqwest::{self, header::CONTENT_TYPE};
use serde_json::{json, Value}; use serde_json::{json, Value};
use solana_client::{ use solana_client::{
rpc_client::{get_rpc_request_str, RpcClient}, rpc_client::{get_rpc_request_str, RpcClient},
rpc_response::{Response, RpcSignatureResult}, rpc_response::{Response, RpcAccount, RpcSignatureResult},
}; };
use solana_core::contact_info::ContactInfo; use solana_core::contact_info::ContactInfo;
use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, validator::TestValidator}; use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, validator::TestValidator};
@ -164,9 +164,15 @@ fn test_rpc_subscriptions() {
.iter() .iter()
.map(|tx| tx.signatures[0].to_string()) .map(|tx| tx.signatures[0].to_string())
.collect(); .collect();
let account_set: HashSet<String> = transactions
.iter()
.map(|tx| tx.message.account_keys[1].to_string())
.collect();
// Track when subscriptions are ready // Track when subscriptions are ready
let (ready_sender, ready_receiver) = channel::<()>(); let (ready_sender, ready_receiver) = channel::<()>();
// Track account notifications are received
let (account_sender, account_receiver) = channel::<Response<RpcAccount>>();
// Track when status notifications are received // Track when status notifications are received
let (status_sender, status_receiver) = channel::<(String, Response<RpcSignatureResult>)>(); let (status_sender, status_receiver) = channel::<(String, Response<RpcSignatureResult>)>();
@ -209,6 +215,22 @@ fn test_rpc_subscriptions() {
eprintln!("slot sub err: {:#?}", err); eprintln!("slot sub err: {:#?}", err);
}), }),
); );
for pubkey in account_set {
let account_sender = account_sender.clone();
tokio::spawn(
client
.account_subscribe(pubkey, None)
.and_then(move |account_stream| {
account_stream.for_each(move |result| {
account_sender.send(result).unwrap();
future::ok(())
})
})
.map_err(|err| {
eprintln!("acct sub err: {:#?}", err);
}),
);
}
future::ok(()) future::ok(())
}) })
.map_err(|_| ()) .map_err(|_| ())
@ -262,6 +284,26 @@ fn test_rpc_subscriptions() {
} }
} }
let deadline = Instant::now() + Duration::from_secs(5);
let mut account_notifications = transactions.len();
while account_notifications > 0 {
let timeout = deadline.saturating_duration_since(Instant::now());
match account_receiver.recv_timeout(timeout) {
Ok(result) => {
assert_eq!(result.value.lamports, 1);
account_notifications -= 1;
}
Err(_err) => {
assert!(
false,
"recv_timeout, {}/{} accounts remaining",
account_notifications,
transactions.len()
);
}
}
}
rt.shutdown_now().wait().unwrap(); rt.shutdown_now().wait().unwrap();
server.close().unwrap(); server.close().unwrap();
remove_dir_all(ledger_path).unwrap(); remove_dir_all(ledger_path).unwrap();

View File

@ -1816,12 +1816,14 @@ impl Bank {
} }
pub fn get_account(&self, pubkey: &Pubkey) -> Option<Account> { pub fn get_account(&self, pubkey: &Pubkey) -> Option<Account> {
self.rc self.get_account_modified_slot(pubkey)
.accounts
.load_slow(&self.ancestors, pubkey)
.map(|(acc, _slot)| acc) .map(|(acc, _slot)| acc)
} }
pub fn get_account_modified_slot(&self, pubkey: &Pubkey) -> Option<(Account, Slot)> {
self.rc.accounts.load_slow(&self.ancestors, pubkey)
}
// Exclude self to really fetch the parent Bank's account hash and data. // Exclude self to really fetch the parent Bank's account hash and data.
// //
// Being idempotent is needed to make the lazy initialization possible, // Being idempotent is needed to make the lazy initialization possible,