From 71f9b4468790726274f6116d9aa4b6b60985bba2 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 6 May 2019 08:31:50 -0600 Subject: [PATCH] Add Confirmations parameter to RPC Subscriptions (#4154) * Add optional depth parameter to pubsub, and store in subscriptions * Pass bank_forks into rpc_subscription; add method to check depth before notify and impl for account subscriptions * Impl check-depth for signature subscriptions * Impl check-depth for program subscriptions * Plumb fork id through accounts * Use fork id and root to prevent repeated account notifications; also s/Depth/Confirmations * Write tests in terms of bank_forks * Fixup accounts tests * Add pubsub-confirmations tests * Update pubsub documentation --- book/src/jsonrpc-api.md | 20 +++ core/src/replay_stage.rs | 2 +- core/src/rpc_pubsub.rs | 183 ++++++++++++++++++++----- core/src/rpc_subscriptions.rs | 244 ++++++++++++++++++++++++++-------- runtime/src/accounts.rs | 16 ++- runtime/src/accounts_db.rs | 70 ++++++---- runtime/src/accounts_index.rs | 20 +-- runtime/src/bank.rs | 7 +- 8 files changed, 430 insertions(+), 132 deletions(-) diff --git a/book/src/jsonrpc-api.md b/book/src/jsonrpc-api.md index d29e5ad2c..a4ad1a2b2 100644 --- a/book/src/jsonrpc-api.md +++ b/book/src/jsonrpc-api.md @@ -316,6 +316,14 @@ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0","id":1, "m After connect to the RPC PubSub websocket at `ws://
/`: - Submit subscription requests to the websocket using the methods below - Multiple subscriptions may be active at once +- All subscriptions take an optional `confirmations` parameter, which defines + how many confirmed blocks the node should wait before sending a notification. + The greater the number, the more likely the notification is to represent + consensus across the cluster, and the less likely it is to be affected by + forking or rollbacks. If unspecified, the default value is 0; the node will + send a notification as soon as it witnesses the event. The maximum + `confirmations` wait length is the cluster's `MAX_LOCKOUT_HISTORY`, which + represents the economic finality of the chain. --- @@ -325,6 +333,8 @@ for a given account public key changes ##### Parameters: * `string` - account Pubkey, as base-58 encoded string +* `integer` - optional, number of confirmed blocks to wait before notification. + Default: 0, Max: `MAX_LOCKOUT_HISTORY` (greater integers rounded down) ##### Results: * `integer` - Subscription id (needed to unsubscribe) @@ -334,6 +344,8 @@ for a given account public key changes // Request {"jsonrpc":"2.0", "id":1, "method":"accountSubscribe", "params":["CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12"]} +{"jsonrpc":"2.0", "id":1, "method":"accountSubscribe", "params":["CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12", 15]} + // Result {"jsonrpc": "2.0","result": 0,"id": 1} ``` @@ -371,6 +383,8 @@ for a given account owned by the program changes ##### Parameters: * `string` - program_id Pubkey, as base-58 encoded string +* `integer` - optional, number of confirmed blocks to wait before notification. + Default: 0, Max: `MAX_LOCKOUT_HISTORY` (greater integers rounded down) ##### Results: * `integer` - Subscription id (needed to unsubscribe) @@ -380,6 +394,8 @@ for a given account owned by the program changes // Request {"jsonrpc":"2.0", "id":1, "method":"programSubscribe", "params":["9gZbPtbtHrs6hEWgd6MbVY9VPFtS5Z8xKtnYwA2NynHV"]} +{"jsonrpc":"2.0", "id":1, "method":"programSubscribe", "params":["9gZbPtbtHrs6hEWgd6MbVY9VPFtS5Z8xKtnYwA2NynHV", 15]} + // Result {"jsonrpc": "2.0","result": 0,"id": 1} ``` @@ -419,6 +435,8 @@ On `signatureNotification`, the subscription is automatically cancelled ##### Parameters: * `string` - Transaction Signature, as base-58 encoded string +* `integer` - optional, number of confirmed blocks to wait before notification. + Default: 0, Max: `MAX_LOCKOUT_HISTORY` (greater integers rounded down) ##### Results: * `integer` - subscription id (needed to unsubscribe) @@ -428,6 +446,8 @@ On `signatureNotification`, the subscription is automatically cancelled // Request {"jsonrpc":"2.0", "id":1, "method":"signatureSubscribe", "params":["2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b"]} +{"jsonrpc":"2.0", "id":1, "method":"signatureSubscribe", "params":["2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b", 15]} + // Result {"jsonrpc": "2.0","result": 0,"id": 1} ``` diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 8ebc2b48d..8bbb8d040 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -145,7 +145,7 @@ impl ReplayStage { Self::generate_votable_banks(&bank_forks, &locktower, &mut progress); if let Some((_, bank)) = votable.last() { - subscriptions.notify_subscribers(&bank); + subscriptions.notify_subscribers(bank.slot(), &bank_forks); Self::handle_votable_bank( &bank, diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index f8f14a4e9..ef16b9d14 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -1,6 +1,6 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::rpc_subscriptions::RpcSubscriptions; +use crate::rpc_subscriptions::{Confirmations, RpcSubscriptions}; use bs58; use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; @@ -24,7 +24,13 @@ pub trait RpcSolPubSub { subscribe, name = "accountSubscribe" )] - fn account_subscribe(&self, _: Self::Metadata, _: Subscriber, _: String); + fn account_subscribe( + &self, + _: Self::Metadata, + _: Subscriber, + _: String, + _: Option, + ); // Unsubscribe from account notification subscription. #[pubsub( @@ -41,7 +47,13 @@ pub trait RpcSolPubSub { subscribe, name = "programSubscribe" )] - fn program_subscribe(&self, _: Self::Metadata, _: Subscriber<(String, Account)>, _: String); + fn program_subscribe( + &self, + _: Self::Metadata, + _: Subscriber<(String, Account)>, + _: String, + _: Option, + ); // Unsubscribe from account notification subscription. #[pubsub( @@ -61,8 +73,9 @@ pub trait RpcSolPubSub { fn signature_subscribe( &self, _: Self::Metadata, - _: Subscriber>>, + _: Subscriber>, _: String, + _: Option, ); // Unsubscribe from signature notification subscription. @@ -95,6 +108,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { _meta: Self::Metadata, subscriber: Subscriber, pubkey_str: String, + confirmations: Option, ) { let pubkey_vec = bs58::decode(pubkey_str).into_vec().unwrap(); if pubkey_vec.len() != mem::size_of::() { @@ -115,7 +129,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); self.subscriptions - .add_account_subscription(&pubkey, &sub_id, &sink) + .add_account_subscription(&pubkey, confirmations, &sub_id, &sink) } fn account_unsubscribe( @@ -140,6 +154,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { _meta: Self::Metadata, subscriber: Subscriber<(String, Account)>, pubkey_str: String, + confirmations: Option, ) { let pubkey_vec = bs58::decode(pubkey_str).into_vec().unwrap(); if pubkey_vec.len() != mem::size_of::() { @@ -160,7 +175,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); self.subscriptions - .add_program_subscription(&pubkey, &sub_id, &sink) + .add_program_subscription(&pubkey, confirmations, &sub_id, &sink) } fn program_unsubscribe( @@ -183,8 +198,9 @@ impl RpcSolPubSub for RpcSolPubSubImpl { fn signature_subscribe( &self, _meta: Self::Metadata, - subscriber: Subscriber>>, + subscriber: Subscriber>, signature_str: String, + confirmations: Option, ) { info!("signature_subscribe"); let signature_vec = bs58::decode(signature_str).into_vec().unwrap(); @@ -199,12 +215,17 @@ impl RpcSolPubSub for RpcSolPubSubImpl { return; } let signature = Signature::new(&signature_vec); + let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst); let sub_id = SubscriptionId::Number(id as u64); + info!( + "signature_subscribe: signature={:?} id={:?}", + signature, sub_id + ); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); self.subscriptions - .add_signature_subscription(&signature, &sub_id, &sink); + .add_signature_subscription(&signature, confirmations, &sub_id, &sink); } fn signature_unsubscribe( @@ -228,6 +249,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { #[cfg(test)] mod tests { use super::*; + use crate::bank_forks::BankForks; use jsonrpc_core::futures::sync::mpsc; use jsonrpc_core::Response; use jsonrpc_pubsub::{PubSubHandler, Session}; @@ -237,26 +259,27 @@ mod tests { use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::system_program; use solana_sdk::system_transaction; use solana_sdk::transaction::{self, Transaction}; + use std::sync::RwLock; use std::thread::sleep; use std::time::Duration; use tokio::prelude::{Async, Stream}; fn process_transaction_and_notify( - bank: &Arc, + bank_forks: &Arc>, tx: &Transaction, subscriptions: &RpcSubscriptions, - ) -> transaction::Result> { - bank.process_transaction(tx)?; - subscriptions.notify_subscribers(&bank); - - // Simulate a block boundary - Ok(Arc::new(Bank::new_from_parent( - &bank, - &Pubkey::default(), - bank.slot() + 1, - ))) + ) -> transaction::Result<()> { + bank_forks + .write() + .unwrap() + .get(0) + .unwrap() + .process_transaction(tx)?; + subscriptions.notify_subscribers(0, &bank_forks); + Ok(()) } fn create_session() -> Arc { @@ -269,8 +292,8 @@ mod tests { let bob = Keypair::new(); let bob_pubkey = bob.pubkey(); let bank = Bank::new(&genesis_block); - let arc_bank = Arc::new(bank); - let blockhash = arc_bank.last_blockhash(); + let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let rpc = RpcSolPubSubImpl::default(); @@ -280,9 +303,9 @@ mod tests { let session = create_session(); let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("signatureNotification"); - rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string()); + rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string(), None); - process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); + process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); // Test signature confirmation notification @@ -354,13 +377,18 @@ mod tests { let budget_program_id = solana_budget_api::id(); let executable = false; // TODO let bank = Bank::new(&genesis_block); - let arc_bank = Arc::new(bank); - let blockhash = arc_bank.last_blockhash(); + let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let rpc = RpcSolPubSubImpl::default(); let session = create_session(); let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); - rpc.account_subscribe(session, subscriber, contract_state.pubkey().to_string()); + rpc.account_subscribe( + session, + subscriber, + contract_state.pubkey().to_string(), + None, + ); let tx = system_transaction::create_user_account( &alice, @@ -369,7 +397,7 @@ mod tests { blockhash, 0, ); - let arc_bank = process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); + process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); let ixs = budget_instruction::when_signed( &contract_funds.pubkey(), @@ -380,12 +408,19 @@ mod tests { 51, ); let tx = Transaction::new_signed_instructions(&[&contract_funds], ixs, blockhash); - let arc_bank = process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); + process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); // Test signature confirmation notification #1 let string = receiver.poll(); - let expected_data = arc_bank.get_account(&contract_state.pubkey()).unwrap().data; + let expected_data = bank_forks + .read() + .unwrap() + .get(0) + .unwrap() + .get_account(&contract_state.pubkey()) + .unwrap() + .data; let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", @@ -406,7 +441,7 @@ mod tests { let tx = system_transaction::create_user_account(&alice, &witness.pubkey(), 1, blockhash, 0); - let arc_bank = process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); + process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); let ix = budget_instruction::apply_signature( &witness.pubkey(), @@ -414,10 +449,18 @@ mod tests { &bob_pubkey, ); let tx = Transaction::new_signed_instructions(&[&witness], vec![ix], blockhash); - let arc_bank = process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); + process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); - assert_eq!(arc_bank.get_account(&contract_state.pubkey()), None); + assert_eq!( + bank_forks + .read() + .unwrap() + .get(0) + .unwrap() + .get_account(&contract_state.pubkey()), + None + ); } #[test] @@ -456,4 +499,80 @@ mod tests { let result: Response = serde_json::from_str(&res.unwrap()).unwrap(); assert_eq!(expected, result); } + + #[test] + #[should_panic] + fn test_account_confirmations_not_fulfilled() { + let (genesis_block, alice) = GenesisBlock::new(10_000); + let bank = Bank::new(&genesis_block); + let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let bob = Keypair::new(); + + let rpc = RpcSolPubSubImpl::default(); + let session = create_session(); + let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); + rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); + + let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash, 0); + bank_forks + .write() + .unwrap() + .get(0) + .unwrap() + .process_transaction(&tx) + .unwrap(); + rpc.subscriptions.notify_subscribers(0, &bank_forks); + let _panic = receiver.poll(); + } + + #[test] + fn test_account_confirmations() { + let (genesis_block, alice) = GenesisBlock::new(10_000); + let bank = Bank::new(&genesis_block); + let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let bob = Keypair::new(); + + let rpc = RpcSolPubSubImpl::default(); + let session = create_session(); + let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); + rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); + + let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash, 0); + bank_forks + .write() + .unwrap() + .get(0) + .unwrap() + .process_transaction(&tx) + .unwrap(); + rpc.subscriptions.notify_subscribers(0, &bank_forks); + + let bank0 = bank_forks.read().unwrap()[0].clone(); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank_forks.write().unwrap().insert(bank1); + rpc.subscriptions.notify_subscribers(1, &bank_forks); + let bank1 = bank_forks.read().unwrap()[1].clone(); + let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); + bank_forks.write().unwrap().insert(bank2); + rpc.subscriptions.notify_subscribers(2, &bank_forks); + let string = receiver.poll(); + let expected = json!({ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "owner": system_program::id(), + "lamports": 100, + "data": [], + "executable": false, + }, + "subscription": 0, + } + }); + if let Async::Ready(Some(response)) = string.unwrap() { + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + } + } } diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 16ebed163..9c9b5709f 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -1,44 +1,58 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request +use crate::bank_forks::BankForks; use bs58; use core::hash::Hash; use jsonrpc_core::futures::Future; use jsonrpc_pubsub::typed::Sink; use jsonrpc_pubsub::SubscriptionId; +use serde::Serialize; use solana_runtime::bank::Bank; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; use solana_sdk::transaction; +use solana_vote_api::vote_state::MAX_LOCKOUT_HISTORY; use std::collections::HashMap; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; -type RpcAccountSubscriptions = RwLock>>>; +pub type Confirmations = usize; + +type RpcAccountSubscriptions = + RwLock, Confirmations)>>>; type RpcProgramSubscriptions = - RwLock>>>; -type RpcSignatureSubscriptions = - RwLock>>>>>; + RwLock, Confirmations)>>>; +type RpcSignatureSubscriptions = RwLock< + HashMap>, Confirmations)>>, +>; fn add_subscription( - subscriptions: &mut HashMap>>, + subscriptions: &mut HashMap, Confirmations)>>, hashmap_key: &K, + confirmations: Option, sub_id: &SubscriptionId, sink: &Sink, ) where K: Eq + Hash + Clone + Copy, S: Clone, { + let confirmations = confirmations.unwrap_or(0); + let confirmations = if confirmations > MAX_LOCKOUT_HISTORY { + MAX_LOCKOUT_HISTORY + } else { + confirmations + }; if let Some(current_hashmap) = subscriptions.get_mut(hashmap_key) { - current_hashmap.insert(sub_id.clone(), sink.clone()); + current_hashmap.insert(sub_id.clone(), (sink.clone(), confirmations)); return; } let mut hashmap = HashMap::new(); - hashmap.insert(sub_id.clone(), sink.clone()); + hashmap.insert(sub_id.clone(), (sink.clone(), confirmations)); subscriptions.insert(*hashmap_key, hashmap); } fn remove_subscription( - subscriptions: &mut HashMap>>, + subscriptions: &mut HashMap, Confirmations)>>, sub_id: &SubscriptionId, ) -> bool where @@ -58,6 +72,84 @@ where found } +fn check_confirmations_and_notify( + subscriptions: &HashMap, Confirmations)>>, + hashmap_key: &K, + current_slot: u64, + bank_forks: &Arc>, + bank_method: F, + notify: N, +) where + K: Eq + Hash + Clone + Copy, + S: Clone + Serialize, + F: Fn(&Bank, &K) -> X, + N: Fn(X, &Sink, u64), + X: Clone + Serialize, +{ + let current_ancestors = bank_forks + .read() + .unwrap() + .get(current_slot) + .unwrap() + .ancestors + .clone(); + if let Some(hashmap) = subscriptions.get(hashmap_key) { + for (_bank_sub_id, (sink, confirmations)) in hashmap.iter() { + let desired_slot: Vec = current_ancestors + .iter() + .filter(|(_, &v)| v == *confirmations) + .map(|(k, _)| k) + .cloned() + .collect(); + let root: Vec = current_ancestors + .iter() + .filter(|(_, &v)| v == 32) + .map(|(k, _)| k) + .cloned() + .collect(); + let root = if root.len() == 1 { root[0] } else { 0 }; + if desired_slot.len() == 1 { + let desired_bank = bank_forks + .read() + .unwrap() + .get(desired_slot[0]) + .unwrap() + .clone(); + let result = bank_method(&desired_bank, hashmap_key); + notify(result, &sink, root); + } + } + } +} + +fn notify_account(result: Option<(S, u64)>, sink: &Sink, root: u64) +where + S: Clone + Serialize, +{ + if let Some((account, fork)) = result { + if fork >= root { + sink.notify(Ok(account)).wait().unwrap(); + } + } +} + +fn notify_signature(result: Option, sink: &Sink, _root: u64) +where + S: Clone + Serialize, +{ + if let Some(result) = result { + sink.notify(Ok(result)).wait().unwrap(); + } +} + +fn notify_program(accounts: Vec<(Pubkey, Account)>, sink: &Sink<(String, Account)>, _root: u64) { + for (pubkey, account) in accounts.iter() { + sink.notify(Ok((bs58::encode(pubkey).into_string(), account.clone()))) + .wait() + .unwrap(); + } +} + pub struct RpcSubscriptions { account_subscriptions: RpcAccountSubscriptions, program_subscriptions: RpcProgramSubscriptions, @@ -75,44 +167,67 @@ impl Default for RpcSubscriptions { } impl RpcSubscriptions { - pub fn check_account(&self, pubkey: &Pubkey, account: &Account) { + pub fn check_account( + &self, + pubkey: &Pubkey, + current_slot: u64, + bank_forks: &Arc>, + ) { let subscriptions = self.account_subscriptions.read().unwrap(); - if let Some(hashmap) = subscriptions.get(pubkey) { - for (_bank_sub_id, sink) in hashmap.iter() { - sink.notify(Ok(account.clone())).wait().unwrap(); - } - } + check_confirmations_and_notify( + &subscriptions, + pubkey, + current_slot, + bank_forks, + Bank::get_account_modified_since_parent, + notify_account, + ); } - pub fn check_program(&self, program_id: &Pubkey, pubkey: &Pubkey, account: &Account) { + pub fn check_program( + &self, + program_id: &Pubkey, + current_slot: u64, + bank_forks: &Arc>, + ) { let subscriptions = self.program_subscriptions.write().unwrap(); - if let Some(hashmap) = subscriptions.get(program_id) { - for (_bank_sub_id, sink) in hashmap.iter() { - sink.notify(Ok((bs58::encode(pubkey).into_string(), account.clone()))) - .wait() - .unwrap(); - } - } + check_confirmations_and_notify( + &subscriptions, + program_id, + current_slot, + bank_forks, + Bank::get_program_accounts_modified_since_parent, + notify_program, + ); } - pub fn check_signature(&self, signature: &Signature, bank_error: &transaction::Result<()>) { + pub fn check_signature( + &self, + signature: &Signature, + current_slot: u64, + bank_forks: &Arc>, + ) { let mut subscriptions = self.signature_subscriptions.write().unwrap(); - if let Some(hashmap) = subscriptions.get(signature) { - for (_bank_sub_id, sink) in hashmap.iter() { - sink.notify(Ok(Some(bank_error.clone()))).wait().unwrap(); - } - } + check_confirmations_and_notify( + &subscriptions, + signature, + current_slot, + bank_forks, + Bank::get_signature_status, + notify_signature, + ); subscriptions.remove(&signature); } pub fn add_account_subscription( &self, pubkey: &Pubkey, + confirmations: Option, sub_id: &SubscriptionId, sink: &Sink, ) { let mut subscriptions = self.account_subscriptions.write().unwrap(); - add_subscription(&mut subscriptions, pubkey, sub_id, sink); + add_subscription(&mut subscriptions, pubkey, confirmations, sub_id, sink); } pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool { @@ -123,11 +238,12 @@ impl RpcSubscriptions { pub fn add_program_subscription( &self, program_id: &Pubkey, + confirmations: Option, sub_id: &SubscriptionId, sink: &Sink<(String, Account)>, ) { let mut subscriptions = self.program_subscriptions.write().unwrap(); - add_subscription(&mut subscriptions, program_id, sub_id, sink); + add_subscription(&mut subscriptions, program_id, confirmations, sub_id, sink); } pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool { @@ -138,11 +254,12 @@ impl RpcSubscriptions { pub fn add_signature_subscription( &self, signature: &Signature, + confirmations: Option, sub_id: &SubscriptionId, - sink: &Sink>>, + sink: &Sink>, ) { let mut subscriptions = self.signature_subscriptions.write().unwrap(); - add_subscription(&mut subscriptions, signature, sub_id, sink); + add_subscription(&mut subscriptions, signature, confirmations, sub_id, sink); } pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool { @@ -152,15 +269,13 @@ impl RpcSubscriptions { /// Notify subscribers of changes to any accounts or new signatures since /// the bank's last checkpoint. - pub fn notify_subscribers(&self, bank: &Bank) { + pub fn notify_subscribers(&self, current_slot: u64, bank_forks: &Arc>) { let pubkeys: Vec<_> = { let subs = self.account_subscriptions.read().unwrap(); subs.keys().cloned().collect() }; for pubkey in &pubkeys { - if let Some(account) = &bank.get_account_modified_since_parent(pubkey) { - self.check_account(pubkey, account); - } + self.check_account(pubkey, current_slot, bank_forks); } let programs: Vec<_> = { @@ -168,10 +283,7 @@ impl RpcSubscriptions { subs.keys().cloned().collect() }; for program_id in &programs { - let accounts = &bank.get_program_accounts_modified_since_parent(program_id); - for (pubkey, account) in accounts.iter() { - self.check_program(program_id, pubkey, account); - } + self.check_program(program_id, current_slot, bank_forks); } let signatures: Vec<_> = { @@ -179,8 +291,7 @@ impl RpcSubscriptions { subs.keys().cloned().collect() }; for signature in &signatures { - let status = bank.get_signature_status(signature).unwrap(); - self.check_signature(signature, &status); + self.check_signature(signature, current_slot, bank_forks); } } } @@ -199,8 +310,9 @@ mod tests { fn test_check_account_subscribe() { let (genesis_block, mint_keypair) = GenesisBlock::new(100); let bank = Bank::new(&genesis_block); - let alice = Keypair::new(); let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let alice = Keypair::new(); let tx = system_transaction::create_account( &mint_keypair, &alice.pubkey(), @@ -210,14 +322,20 @@ mod tests { &solana_budget_api::id(), 0, ); - bank.process_transaction(&tx).unwrap(); + bank_forks + .write() + .unwrap() + .get(0) + .unwrap() + .process_transaction(&tx) + .unwrap(); let (subscriber, _id_receiver, mut transport_receiver) = Subscriber::new_test("accountNotification"); let sub_id = SubscriptionId::Number(0 as u64); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let subscriptions = RpcSubscriptions::default(); - subscriptions.add_account_subscription(&alice.pubkey(), &sub_id, &sink); + subscriptions.add_account_subscription(&alice.pubkey(), None, &sub_id, &sink); assert!(subscriptions .account_subscriptions @@ -225,8 +343,7 @@ mod tests { .unwrap() .contains_key(&alice.pubkey())); - let account = bank.get_account(&alice.pubkey()).unwrap(); - subscriptions.check_account(&alice.pubkey(), &account); + subscriptions.check_account(&alice.pubkey(), 0, &bank_forks); let string = transport_receiver.poll(); if let Async::Ready(Some(response)) = string.unwrap() { let expected = format!(r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"data":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"executable":false,"lamports":1,"owner":[2,203,81,223,225,24,34,35,203,214,138,130,144,208,35,77,63,16,87,51,47,198,115,123,98,188,19,160,0,0,0,0]}},"subscription":0}}}}"#); @@ -245,8 +362,9 @@ mod tests { fn test_check_program_subscribe() { let (genesis_block, mint_keypair) = GenesisBlock::new(100); let bank = Bank::new(&genesis_block); - let alice = Keypair::new(); let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let alice = Keypair::new(); let tx = system_transaction::create_account( &mint_keypair, &alice.pubkey(), @@ -256,14 +374,20 @@ mod tests { &solana_budget_api::id(), 0, ); - bank.process_transaction(&tx).unwrap(); + bank_forks + .write() + .unwrap() + .get(0) + .unwrap() + .process_transaction(&tx) + .unwrap(); let (subscriber, _id_receiver, mut transport_receiver) = Subscriber::new_test("programNotification"); let sub_id = SubscriptionId::Number(0 as u64); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let subscriptions = RpcSubscriptions::default(); - subscriptions.add_program_subscription(&solana_budget_api::id(), &sub_id, &sink); + subscriptions.add_program_subscription(&solana_budget_api::id(), None, &sub_id, &sink); assert!(subscriptions .program_subscriptions @@ -271,8 +395,7 @@ mod tests { .unwrap() .contains_key(&solana_budget_api::id())); - let account = bank.get_account(&alice.pubkey()).unwrap(); - subscriptions.check_program(&solana_budget_api::id(), &alice.pubkey(), &account); + subscriptions.check_program(&solana_budget_api::id(), 0, &bank_forks); let string = transport_receiver.poll(); if let Async::Ready(Some(response)) = string.unwrap() { let expected = format!(r#"{{"jsonrpc":"2.0","method":"programNotification","params":{{"result":["{:?}",{{"data":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"executable":false,"lamports":1,"owner":[2,203,81,223,225,24,34,35,203,214,138,130,144,208,35,77,63,16,87,51,47,198,115,123,98,188,19,160,0,0,0,0]}}],"subscription":0}}}}"#, alice.pubkey()); @@ -290,18 +413,25 @@ mod tests { fn test_check_signature_subscribe() { let (genesis_block, mint_keypair) = GenesisBlock::new(100); let bank = Bank::new(&genesis_block); - let alice = Keypair::new(); let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let alice = Keypair::new(); let tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 20, blockhash, 0); let signature = tx.signatures[0]; - bank.process_transaction(&tx).unwrap(); + bank_forks + .write() + .unwrap() + .get(0) + .unwrap() + .process_transaction(&tx) + .unwrap(); let (subscriber, _id_receiver, mut transport_receiver) = Subscriber::new_test("signatureNotification"); let sub_id = SubscriptionId::Number(0 as u64); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let subscriptions = RpcSubscriptions::default(); - subscriptions.add_signature_subscription(&signature, &sub_id, &sink); + subscriptions.add_signature_subscription(&signature, None, &sub_id, &sink); assert!(subscriptions .signature_subscriptions @@ -309,7 +439,7 @@ mod tests { .unwrap() .contains_key(&signature)); - subscriptions.check_signature(&signature, &Ok(())); + subscriptions.check_signature(&signature, 0, &bank_forks); let string = transport_receiver.poll(); if let Async::Ready(Some(response)) = string.unwrap() { let expected_res: Option> = Some(Ok(())); diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index a6af53560..17c096f57 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -163,7 +163,9 @@ impl Accounts { let mut called_accounts: Vec = vec![]; for key in &message.account_keys { called_accounts.push( - AccountsDB::load(storage, ancestors, accounts_index, key).unwrap_or_default(), + AccountsDB::load(storage, ancestors, accounts_index, key) + .map(|(account, _)| account) + .unwrap_or_default(), ); } if called_accounts.is_empty() || called_accounts[0].lamports == 0 { @@ -201,7 +203,9 @@ impl Accounts { } depth += 1; - let program = match AccountsDB::load(storage, ancestors, accounts_index, &program_id) { + let program = match AccountsDB::load(storage, ancestors, accounts_index, &program_id) + .map(|(account, _)| account) + { Some(program) => program, None => { error_counters.account_not_found += 1; @@ -289,10 +293,14 @@ impl Accounts { } /// Slow because lock is held for 1 operation instead of many - pub fn load_slow(&self, ancestors: &HashMap, pubkey: &Pubkey) -> Option { + pub fn load_slow( + &self, + ancestors: &HashMap, + pubkey: &Pubkey, + ) -> Option<(Account, Fork)> { self.accounts_db .load_slow(ancestors, pubkey) - .filter(|acc| acc.lamports != 0) + .filter(|(acc, _)| acc.lamports != 0) } pub fn load_by_program(&self, fork: Fork, program_id: &Pubkey) -> Vec<(Pubkey, Account)> { diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index a71907940..e3b848b11 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -259,15 +259,20 @@ impl AccountsDB { ancestors: &HashMap, accounts_index: &AccountsIndex, pubkey: &Pubkey, - ) -> Option { - let info = accounts_index.get(pubkey, ancestors)?; + ) -> Option<(Account, Fork)> { + let (info, fork) = accounts_index.get(pubkey, ancestors)?; //TODO: thread this as a ref storage .get(&info.id) .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) + .map(|account| (account, fork)) } - pub fn load_slow(&self, ancestors: &HashMap, pubkey: &Pubkey) -> Option { + pub fn load_slow( + &self, + ancestors: &HashMap, + pubkey: &Pubkey, + ) -> Option<(Account, Fork)> { let accounts_index = self.accounts_index.read().unwrap(); let storage = self.storage.read().unwrap(); Self::load(&storage, ancestors, &accounts_index, pubkey) @@ -480,7 +485,7 @@ mod tests { db.store(0, &[(&key, &account0)]); db.add_root(0); let ancestors = vec![(1, 1)].into_iter().collect(); - assert_eq!(db.load_slow(&ancestors, &key), Some(account0)); + assert_eq!(db.load_slow(&ancestors, &key), Some((account0, 0))); } #[test] @@ -497,10 +502,10 @@ mod tests { db.store(1, &[(&key, &account1)]); let ancestors = vec![(1, 1)].into_iter().collect(); - assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); + assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1); let ancestors = vec![(1, 1), (0, 0)].into_iter().collect(); - assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); + assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1); } #[test] @@ -518,10 +523,10 @@ mod tests { db.add_root(0); let ancestors = vec![(1, 1)].into_iter().collect(); - assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); + assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1); let ancestors = vec![(1, 1), (0, 0)].into_iter().collect(); - assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); + assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1); } #[test] @@ -552,18 +557,18 @@ mod tests { // original account (but could also accept "None", which is implemented // at the Accounts level) let ancestors = vec![(0, 0), (1, 1)].into_iter().collect(); - assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); + assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1); // we should see 1 token in fork 2 let ancestors = vec![(0, 0), (2, 2)].into_iter().collect(); - assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account0); + assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account0); db.add_root(0); let ancestors = vec![(1, 1)].into_iter().collect(); - assert_eq!(db.load_slow(&ancestors, &key), Some(account1)); + assert_eq!(db.load_slow(&ancestors, &key), Some((account1, 1))); let ancestors = vec![(2, 2)].into_iter().collect(); - assert_eq!(db.load_slow(&ancestors, &key), Some(account0)); // original value + assert_eq!(db.load_slow(&ancestors, &key), Some((account0, 0))); // original value } #[test] @@ -579,7 +584,7 @@ mod tests { let account = db.load_slow(&ancestors, &pubkeys[idx]).unwrap(); let mut default_account = Account::default(); default_account.lamports = (idx + 1) as u64; - assert_eq!(default_account, account); + assert_eq!((default_account, 0), account); } db.add_root(0); @@ -593,8 +598,8 @@ mod tests { let account1 = db.load_slow(&ancestors, &pubkeys[idx]).unwrap(); let mut default_account = Account::default(); default_account.lamports = (idx + 1) as u64; - assert_eq!(&default_account, &account0); - assert_eq!(&default_account, &account1); + assert_eq!(&default_account, &account0.0); + assert_eq!(&default_account, &account1.0); } } @@ -650,9 +655,9 @@ mod tests { // masking accounts is done at the Accounts level, at accountsDB we see // original account let ancestors = vec![(0, 0), (1, 1)].into_iter().collect(); - assert_eq!(db0.load_slow(&ancestors, &key), Some(account1)); + assert_eq!(db0.load_slow(&ancestors, &key), Some((account1, 1))); let ancestors = vec![(0, 0)].into_iter().collect(); - assert_eq!(db0.load_slow(&ancestors, &key), Some(account0)); + assert_eq!(db0.load_slow(&ancestors, &key), Some((account0, 0))); } fn create_account( @@ -685,7 +690,7 @@ mod tests { for _ in 1..1000 { let idx = thread_rng().gen_range(0, range); let ancestors = vec![(fork, 0)].into_iter().collect(); - if let Some(mut account) = accounts.load_slow(&ancestors, &pubkeys[idx]) { + if let Some((mut account, _)) = accounts.load_slow(&ancestors, &pubkeys[idx]) { account.lamports = account.lamports + 1; accounts.store(fork, &[(&pubkeys[idx], &account)]); if account.lamports == 0 { @@ -714,7 +719,7 @@ mod tests { let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap(); let mut default_account = Account::default(); default_account.lamports = (idx + 1) as u64; - assert_eq!(default_account, account); + assert_eq!((default_account, 0), account); } } @@ -728,7 +733,7 @@ mod tests { let account = accounts.load_slow(&ancestors, &pubkeys[0]).unwrap(); let mut default_account = Account::default(); default_account.lamports = 1; - assert_eq!(default_account, account); + assert_eq!((default_account, 0), account); } #[test] @@ -765,7 +770,7 @@ mod tests { for (i, key) in keys.iter().enumerate() { let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!( - accounts.load_slow(&ancestors, &key).unwrap().lamports, + accounts.load_slow(&ancestors, &key).unwrap().0.lamports, (i as u64) + 1 ); } @@ -810,8 +815,14 @@ mod tests { assert_eq!(stores[&1].status(), AccountStorageStatus::StorageAvailable); } let ancestors = vec![(0, 0)].into_iter().collect(); - assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); - assert_eq!(accounts.load_slow(&ancestors, &pubkey2).unwrap(), account2); + assert_eq!( + accounts.load_slow(&ancestors, &pubkey1).unwrap().0, + account1 + ); + assert_eq!( + accounts.load_slow(&ancestors, &pubkey2).unwrap().0, + account2 + ); // lots of stores, but 3 storages should be enough for everything for i in 0..25 { @@ -828,8 +839,14 @@ mod tests { assert_eq!(stores[&2].status(), status[0]); } let ancestors = vec![(0, 0)].into_iter().collect(); - assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); - assert_eq!(accounts.load_slow(&ancestors, &pubkey2).unwrap(), account2); + assert_eq!( + accounts.load_slow(&ancestors, &pubkey1).unwrap().0, + account1 + ); + assert_eq!( + accounts.load_slow(&ancestors, &pubkey2).unwrap().0, + account2 + ); } } @@ -875,6 +892,7 @@ mod tests { .unwrap() .get(&pubkey, &ancestors) .unwrap() + .0 .clone(); //fork 0 is behind root, but it is not root, therefore it is purged accounts.add_root(1); @@ -891,7 +909,7 @@ mod tests { //new value is there let ancestors = vec![(1, 1)].into_iter().collect(); - assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some(account));; + assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1))); } } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index c4dec064b..46a258231 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -15,21 +15,21 @@ pub struct AccountsIndex { impl AccountsIndex { /// Get an account /// The latest account that appears in `ancestors` or `roots` is returned. - pub fn get(&self, pubkey: &Pubkey, ancestors: &HashMap) -> Option<&T> { + pub fn get(&self, pubkey: &Pubkey, ancestors: &HashMap) -> Option<(&T, Fork)> { let list = self.account_maps.get(pubkey)?; let mut max = 0; let mut rv = None; for e in list.iter().rev() { if e.0 >= max && (ancestors.get(&e.0).is_some() || self.is_root(e.0)) { trace!("GET {} {:?}", e.0, ancestors); - rv = Some(&e.1); + rv = Some((&e.1, e.0)); max = e.0; } } rv } - /// Insert a new fork. + /// Insert a new fork. /// @retval - The return value contains any squashed accounts that can freed from storage. pub fn insert(&mut self, fork: Fork, pubkey: &Pubkey, account_info: T) -> Vec<(Fork, T)> { let mut rv = vec![]; @@ -123,7 +123,7 @@ mod tests { assert!(gc.is_empty()); let ancestors = vec![(0, 0)].into_iter().collect(); - assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&true)); + assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 0))); } #[test] @@ -143,7 +143,7 @@ mod tests { let ancestors = vec![].into_iter().collect(); index.add_root(0); - assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&true)); + assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 0))); } #[test] @@ -199,11 +199,11 @@ mod tests { let ancestors = vec![(0, 0)].into_iter().collect(); let gc = index.insert(0, &key.pubkey(), true); assert!(gc.is_empty()); - assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&true)); + assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 0))); let gc = index.insert(0, &key.pubkey(), false); assert_eq!(gc, vec![(0, true)]); - assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&false)); + assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&false, 0))); } #[test] @@ -215,9 +215,9 @@ mod tests { assert!(gc.is_empty()); let gc = index.insert(1, &key.pubkey(), false); assert!(gc.is_empty()); - assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&true)); + assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 0))); let ancestors = vec![(1, 0)].into_iter().collect(); - assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&false)); + assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&false, 1))); } #[test] @@ -230,6 +230,6 @@ mod tests { let gc = index.insert(1, &key.pubkey(), false); assert_eq!(gc, vec![(0, true)]); let ancestors = vec![].into_iter().collect(); - assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&false)); + assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&false, 1))); } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 590889edf..a246869fe 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -5,6 +5,7 @@ use crate::accounts::Accounts; use crate::accounts_db::{ErrorCounters, InstructionAccounts, InstructionLoaders}; +use crate::accounts_index::Fork; use crate::blockhash_queue::BlockhashQueue; use crate::locked_accounts_results::LockedAccountsResults; use crate::message_processor::{MessageProcessor, ProcessInstruction}; @@ -888,7 +889,9 @@ impl Bank { } pub fn get_account(&self, pubkey: &Pubkey) -> Option { - self.accounts.load_slow(&self.ancestors, pubkey) + self.accounts + .load_slow(&self.ancestors, pubkey) + .map(|(account, _)| account) } pub fn get_program_accounts_modified_since_parent( @@ -898,7 +901,7 @@ impl Bank { self.accounts.load_by_program(self.slot(), program_id) } - pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option { + pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option<(Account, Fork)> { let just_self: HashMap = vec![(self.slot(), 0)].into_iter().collect(); self.accounts.load_slow(&just_self, pubkey) }