Add Confirmations parameter to RPC Subscriptions (#4154)

* Add optional depth parameter to pubsub, and store in subscriptions

* Pass bank_forks into rpc_subscription; add method to check depth before notify and impl for account subscriptions

* Impl check-depth for signature subscriptions

* Impl check-depth for program subscriptions

* Plumb fork id through accounts

* Use fork id and root to prevent repeated account notifications; also s/Depth/Confirmations

* Write tests in terms of bank_forks

* Fixup accounts tests

* Add pubsub-confirmations tests

* Update pubsub documentation
This commit is contained in:
Tyera Eulberg 2019-05-06 08:31:50 -06:00 committed by GitHub
parent 0139e5db21
commit 71f9b44687
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 430 additions and 132 deletions

View File

@ -316,6 +316,14 @@ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0","id":1, "m
After connect to the RPC PubSub websocket at `ws://<ADDRESS>/`:
- Submit subscription requests to the websocket using the methods below
- Multiple subscriptions may be active at once
- All subscriptions take an optional `confirmations` parameter, which defines
how many confirmed blocks the node should wait before sending a notification.
The greater the number, the more likely the notification is to represent
consensus across the cluster, and the less likely it is to be affected by
forking or rollbacks. If unspecified, the default value is 0; the node will
send a notification as soon as it witnesses the event. The maximum
`confirmations` wait length is the cluster's `MAX_LOCKOUT_HISTORY`, which
represents the economic finality of the chain.
---
@ -325,6 +333,8 @@ for a given account public key changes
##### Parameters:
* `string` - account Pubkey, as base-58 encoded string
* `integer` - optional, number of confirmed blocks to wait before notification.
Default: 0, Max: `MAX_LOCKOUT_HISTORY` (greater integers rounded down)
##### Results:
* `integer` - Subscription id (needed to unsubscribe)
@ -334,6 +344,8 @@ for a given account public key changes
// Request
{"jsonrpc":"2.0", "id":1, "method":"accountSubscribe", "params":["CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12"]}
{"jsonrpc":"2.0", "id":1, "method":"accountSubscribe", "params":["CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12", 15]}
// Result
{"jsonrpc": "2.0","result": 0,"id": 1}
```
@ -371,6 +383,8 @@ for a given account owned by the program changes
##### Parameters:
* `string` - program_id Pubkey, as base-58 encoded string
* `integer` - optional, number of confirmed blocks to wait before notification.
Default: 0, Max: `MAX_LOCKOUT_HISTORY` (greater integers rounded down)
##### Results:
* `integer` - Subscription id (needed to unsubscribe)
@ -380,6 +394,8 @@ for a given account owned by the program changes
// Request
{"jsonrpc":"2.0", "id":1, "method":"programSubscribe", "params":["9gZbPtbtHrs6hEWgd6MbVY9VPFtS5Z8xKtnYwA2NynHV"]}
{"jsonrpc":"2.0", "id":1, "method":"programSubscribe", "params":["9gZbPtbtHrs6hEWgd6MbVY9VPFtS5Z8xKtnYwA2NynHV", 15]}
// Result
{"jsonrpc": "2.0","result": 0,"id": 1}
```
@ -419,6 +435,8 @@ On `signatureNotification`, the subscription is automatically cancelled
##### Parameters:
* `string` - Transaction Signature, as base-58 encoded string
* `integer` - optional, number of confirmed blocks to wait before notification.
Default: 0, Max: `MAX_LOCKOUT_HISTORY` (greater integers rounded down)
##### Results:
* `integer` - subscription id (needed to unsubscribe)
@ -428,6 +446,8 @@ On `signatureNotification`, the subscription is automatically cancelled
// Request
{"jsonrpc":"2.0", "id":1, "method":"signatureSubscribe", "params":["2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b"]}
{"jsonrpc":"2.0", "id":1, "method":"signatureSubscribe", "params":["2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b", 15]}
// Result
{"jsonrpc": "2.0","result": 0,"id": 1}
```

View File

@ -145,7 +145,7 @@ impl ReplayStage {
Self::generate_votable_banks(&bank_forks, &locktower, &mut progress);
if let Some((_, bank)) = votable.last() {
subscriptions.notify_subscribers(&bank);
subscriptions.notify_subscribers(bank.slot(), &bank_forks);
Self::handle_votable_bank(
&bank,

View File

@ -1,6 +1,6 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::rpc_subscriptions::{Confirmations, RpcSubscriptions};
use bs58;
use jsonrpc_core::{Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
@ -24,7 +24,13 @@ pub trait RpcSolPubSub {
subscribe,
name = "accountSubscribe"
)]
fn account_subscribe(&self, _: Self::Metadata, _: Subscriber<Account>, _: String);
fn account_subscribe(
&self,
_: Self::Metadata,
_: Subscriber<Account>,
_: String,
_: Option<Confirmations>,
);
// Unsubscribe from account notification subscription.
#[pubsub(
@ -41,7 +47,13 @@ pub trait RpcSolPubSub {
subscribe,
name = "programSubscribe"
)]
fn program_subscribe(&self, _: Self::Metadata, _: Subscriber<(String, Account)>, _: String);
fn program_subscribe(
&self,
_: Self::Metadata,
_: Subscriber<(String, Account)>,
_: String,
_: Option<Confirmations>,
);
// Unsubscribe from account notification subscription.
#[pubsub(
@ -61,8 +73,9 @@ pub trait RpcSolPubSub {
fn signature_subscribe(
&self,
_: Self::Metadata,
_: Subscriber<Option<transaction::Result<()>>>,
_: Subscriber<transaction::Result<()>>,
_: String,
_: Option<Confirmations>,
);
// Unsubscribe from signature notification subscription.
@ -95,6 +108,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
_meta: Self::Metadata,
subscriber: Subscriber<Account>,
pubkey_str: String,
confirmations: Option<Confirmations>,
) {
let pubkey_vec = bs58::decode(pubkey_str).into_vec().unwrap();
if pubkey_vec.len() != mem::size_of::<Pubkey>() {
@ -115,7 +129,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
self.subscriptions
.add_account_subscription(&pubkey, &sub_id, &sink)
.add_account_subscription(&pubkey, confirmations, &sub_id, &sink)
}
fn account_unsubscribe(
@ -140,6 +154,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
_meta: Self::Metadata,
subscriber: Subscriber<(String, Account)>,
pubkey_str: String,
confirmations: Option<Confirmations>,
) {
let pubkey_vec = bs58::decode(pubkey_str).into_vec().unwrap();
if pubkey_vec.len() != mem::size_of::<Pubkey>() {
@ -160,7 +175,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
self.subscriptions
.add_program_subscription(&pubkey, &sub_id, &sink)
.add_program_subscription(&pubkey, confirmations, &sub_id, &sink)
}
fn program_unsubscribe(
@ -183,8 +198,9 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
fn signature_subscribe(
&self,
_meta: Self::Metadata,
subscriber: Subscriber<Option<transaction::Result<()>>>,
subscriber: Subscriber<transaction::Result<()>>,
signature_str: String,
confirmations: Option<Confirmations>,
) {
info!("signature_subscribe");
let signature_vec = bs58::decode(signature_str).into_vec().unwrap();
@ -199,12 +215,17 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
return;
}
let signature = Signature::new(&signature_vec);
let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
let sub_id = SubscriptionId::Number(id as u64);
info!(
"signature_subscribe: signature={:?} id={:?}",
signature, sub_id
);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
self.subscriptions
.add_signature_subscription(&signature, &sub_id, &sink);
.add_signature_subscription(&signature, confirmations, &sub_id, &sink);
}
fn signature_unsubscribe(
@ -228,6 +249,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
#[cfg(test)]
mod tests {
use super::*;
use crate::bank_forks::BankForks;
use jsonrpc_core::futures::sync::mpsc;
use jsonrpc_core::Response;
use jsonrpc_pubsub::{PubSubHandler, Session};
@ -237,26 +259,27 @@ mod tests {
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_program;
use solana_sdk::system_transaction;
use solana_sdk::transaction::{self, Transaction};
use std::sync::RwLock;
use std::thread::sleep;
use std::time::Duration;
use tokio::prelude::{Async, Stream};
fn process_transaction_and_notify(
bank: &Arc<Bank>,
bank_forks: &Arc<RwLock<BankForks>>,
tx: &Transaction,
subscriptions: &RpcSubscriptions,
) -> transaction::Result<Arc<Bank>> {
bank.process_transaction(tx)?;
subscriptions.notify_subscribers(&bank);
// Simulate a block boundary
Ok(Arc::new(Bank::new_from_parent(
&bank,
&Pubkey::default(),
bank.slot() + 1,
)))
) -> transaction::Result<()> {
bank_forks
.write()
.unwrap()
.get(0)
.unwrap()
.process_transaction(tx)?;
subscriptions.notify_subscribers(0, &bank_forks);
Ok(())
}
fn create_session() -> Arc<Session> {
@ -269,8 +292,8 @@ mod tests {
let bob = Keypair::new();
let bob_pubkey = bob.pubkey();
let bank = Bank::new(&genesis_block);
let arc_bank = Arc::new(bank);
let blockhash = arc_bank.last_blockhash();
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let rpc = RpcSolPubSubImpl::default();
@ -280,9 +303,9 @@ mod tests {
let session = create_session();
let (subscriber, _id_receiver, mut receiver) =
Subscriber::new_test("signatureNotification");
rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string());
rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string(), None);
process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap();
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap();
sleep(Duration::from_millis(200));
// Test signature confirmation notification
@ -354,13 +377,18 @@ mod tests {
let budget_program_id = solana_budget_api::id();
let executable = false; // TODO
let bank = Bank::new(&genesis_block);
let arc_bank = Arc::new(bank);
let blockhash = arc_bank.last_blockhash();
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let rpc = RpcSolPubSubImpl::default();
let session = create_session();
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification");
rpc.account_subscribe(session, subscriber, contract_state.pubkey().to_string());
rpc.account_subscribe(
session,
subscriber,
contract_state.pubkey().to_string(),
None,
);
let tx = system_transaction::create_user_account(
&alice,
@ -369,7 +397,7 @@ mod tests {
blockhash,
0,
);
let arc_bank = process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap();
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap();
let ixs = budget_instruction::when_signed(
&contract_funds.pubkey(),
@ -380,12 +408,19 @@ mod tests {
51,
);
let tx = Transaction::new_signed_instructions(&[&contract_funds], ixs, blockhash);
let arc_bank = process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap();
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap();
sleep(Duration::from_millis(200));
// Test signature confirmation notification #1
let string = receiver.poll();
let expected_data = arc_bank.get_account(&contract_state.pubkey()).unwrap().data;
let expected_data = bank_forks
.read()
.unwrap()
.get(0)
.unwrap()
.get_account(&contract_state.pubkey())
.unwrap()
.data;
let expected = json!({
"jsonrpc": "2.0",
"method": "accountNotification",
@ -406,7 +441,7 @@ mod tests {
let tx =
system_transaction::create_user_account(&alice, &witness.pubkey(), 1, blockhash, 0);
let arc_bank = process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap();
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap();
sleep(Duration::from_millis(200));
let ix = budget_instruction::apply_signature(
&witness.pubkey(),
@ -414,10 +449,18 @@ mod tests {
&bob_pubkey,
);
let tx = Transaction::new_signed_instructions(&[&witness], vec![ix], blockhash);
let arc_bank = process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap();
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap();
sleep(Duration::from_millis(200));
assert_eq!(arc_bank.get_account(&contract_state.pubkey()), None);
assert_eq!(
bank_forks
.read()
.unwrap()
.get(0)
.unwrap()
.get_account(&contract_state.pubkey()),
None
);
}
#[test]
@ -456,4 +499,80 @@ mod tests {
let result: Response = serde_json::from_str(&res.unwrap()).unwrap();
assert_eq!(expected, result);
}
#[test]
#[should_panic]
fn test_account_confirmations_not_fulfilled() {
let (genesis_block, alice) = GenesisBlock::new(10_000);
let bank = Bank::new(&genesis_block);
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let bob = Keypair::new();
let rpc = RpcSolPubSubImpl::default();
let session = create_session();
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification");
rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2));
let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash, 0);
bank_forks
.write()
.unwrap()
.get(0)
.unwrap()
.process_transaction(&tx)
.unwrap();
rpc.subscriptions.notify_subscribers(0, &bank_forks);
let _panic = receiver.poll();
}
#[test]
fn test_account_confirmations() {
let (genesis_block, alice) = GenesisBlock::new(10_000);
let bank = Bank::new(&genesis_block);
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let bob = Keypair::new();
let rpc = RpcSolPubSubImpl::default();
let session = create_session();
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification");
rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2));
let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash, 0);
bank_forks
.write()
.unwrap()
.get(0)
.unwrap()
.process_transaction(&tx)
.unwrap();
rpc.subscriptions.notify_subscribers(0, &bank_forks);
let bank0 = bank_forks.read().unwrap()[0].clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
rpc.subscriptions.notify_subscribers(1, &bank_forks);
let bank1 = bank_forks.read().unwrap()[1].clone();
let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2);
bank_forks.write().unwrap().insert(bank2);
rpc.subscriptions.notify_subscribers(2, &bank_forks);
let string = receiver.poll();
let expected = json!({
"jsonrpc": "2.0",
"method": "accountNotification",
"params": {
"result": {
"owner": system_program::id(),
"lamports": 100,
"data": [],
"executable": false,
},
"subscription": 0,
}
});
if let Async::Ready(Some(response)) = string.unwrap() {
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
}
}
}

View File

@ -1,44 +1,58 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::bank_forks::BankForks;
use bs58;
use core::hash::Hash;
use jsonrpc_core::futures::Future;
use jsonrpc_pubsub::typed::Sink;
use jsonrpc_pubsub::SubscriptionId;
use serde::Serialize;
use solana_runtime::bank::Bank;
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use solana_sdk::transaction;
use solana_vote_api::vote_state::MAX_LOCKOUT_HISTORY;
use std::collections::HashMap;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
type RpcAccountSubscriptions = RwLock<HashMap<Pubkey, HashMap<SubscriptionId, Sink<Account>>>>;
pub type Confirmations = usize;
type RpcAccountSubscriptions =
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<Account>, Confirmations)>>>;
type RpcProgramSubscriptions =
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, Sink<(String, Account)>>>>;
type RpcSignatureSubscriptions =
RwLock<HashMap<Signature, HashMap<SubscriptionId, Sink<Option<transaction::Result<()>>>>>>;
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<(String, Account)>, Confirmations)>>>;
type RpcSignatureSubscriptions = RwLock<
HashMap<Signature, HashMap<SubscriptionId, (Sink<transaction::Result<()>>, Confirmations)>>,
>;
fn add_subscription<K, S>(
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, Sink<S>>>,
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, (Sink<S>, Confirmations)>>,
hashmap_key: &K,
confirmations: Option<Confirmations>,
sub_id: &SubscriptionId,
sink: &Sink<S>,
) where
K: Eq + Hash + Clone + Copy,
S: Clone,
{
let confirmations = confirmations.unwrap_or(0);
let confirmations = if confirmations > MAX_LOCKOUT_HISTORY {
MAX_LOCKOUT_HISTORY
} else {
confirmations
};
if let Some(current_hashmap) = subscriptions.get_mut(hashmap_key) {
current_hashmap.insert(sub_id.clone(), sink.clone());
current_hashmap.insert(sub_id.clone(), (sink.clone(), confirmations));
return;
}
let mut hashmap = HashMap::new();
hashmap.insert(sub_id.clone(), sink.clone());
hashmap.insert(sub_id.clone(), (sink.clone(), confirmations));
subscriptions.insert(*hashmap_key, hashmap);
}
fn remove_subscription<K, S>(
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, Sink<S>>>,
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, (Sink<S>, Confirmations)>>,
sub_id: &SubscriptionId,
) -> bool
where
@ -58,6 +72,84 @@ where
found
}
fn check_confirmations_and_notify<K, S, F, N, X>(
subscriptions: &HashMap<K, HashMap<SubscriptionId, (Sink<S>, Confirmations)>>,
hashmap_key: &K,
current_slot: u64,
bank_forks: &Arc<RwLock<BankForks>>,
bank_method: F,
notify: N,
) where
K: Eq + Hash + Clone + Copy,
S: Clone + Serialize,
F: Fn(&Bank, &K) -> X,
N: Fn(X, &Sink<S>, u64),
X: Clone + Serialize,
{
let current_ancestors = bank_forks
.read()
.unwrap()
.get(current_slot)
.unwrap()
.ancestors
.clone();
if let Some(hashmap) = subscriptions.get(hashmap_key) {
for (_bank_sub_id, (sink, confirmations)) in hashmap.iter() {
let desired_slot: Vec<u64> = current_ancestors
.iter()
.filter(|(_, &v)| v == *confirmations)
.map(|(k, _)| k)
.cloned()
.collect();
let root: Vec<u64> = current_ancestors
.iter()
.filter(|(_, &v)| v == 32)
.map(|(k, _)| k)
.cloned()
.collect();
let root = if root.len() == 1 { root[0] } else { 0 };
if desired_slot.len() == 1 {
let desired_bank = bank_forks
.read()
.unwrap()
.get(desired_slot[0])
.unwrap()
.clone();
let result = bank_method(&desired_bank, hashmap_key);
notify(result, &sink, root);
}
}
}
}
fn notify_account<S>(result: Option<(S, u64)>, sink: &Sink<S>, root: u64)
where
S: Clone + Serialize,
{
if let Some((account, fork)) = result {
if fork >= root {
sink.notify(Ok(account)).wait().unwrap();
}
}
}
fn notify_signature<S>(result: Option<S>, sink: &Sink<S>, _root: u64)
where
S: Clone + Serialize,
{
if let Some(result) = result {
sink.notify(Ok(result)).wait().unwrap();
}
}
fn notify_program(accounts: Vec<(Pubkey, Account)>, sink: &Sink<(String, Account)>, _root: u64) {
for (pubkey, account) in accounts.iter() {
sink.notify(Ok((bs58::encode(pubkey).into_string(), account.clone())))
.wait()
.unwrap();
}
}
pub struct RpcSubscriptions {
account_subscriptions: RpcAccountSubscriptions,
program_subscriptions: RpcProgramSubscriptions,
@ -75,44 +167,67 @@ impl Default for RpcSubscriptions {
}
impl RpcSubscriptions {
pub fn check_account(&self, pubkey: &Pubkey, account: &Account) {
pub fn check_account(
&self,
pubkey: &Pubkey,
current_slot: u64,
bank_forks: &Arc<RwLock<BankForks>>,
) {
let subscriptions = self.account_subscriptions.read().unwrap();
if let Some(hashmap) = subscriptions.get(pubkey) {
for (_bank_sub_id, sink) in hashmap.iter() {
sink.notify(Ok(account.clone())).wait().unwrap();
}
}
check_confirmations_and_notify(
&subscriptions,
pubkey,
current_slot,
bank_forks,
Bank::get_account_modified_since_parent,
notify_account,
);
}
pub fn check_program(&self, program_id: &Pubkey, pubkey: &Pubkey, account: &Account) {
pub fn check_program(
&self,
program_id: &Pubkey,
current_slot: u64,
bank_forks: &Arc<RwLock<BankForks>>,
) {
let subscriptions = self.program_subscriptions.write().unwrap();
if let Some(hashmap) = subscriptions.get(program_id) {
for (_bank_sub_id, sink) in hashmap.iter() {
sink.notify(Ok((bs58::encode(pubkey).into_string(), account.clone())))
.wait()
.unwrap();
}
}
check_confirmations_and_notify(
&subscriptions,
program_id,
current_slot,
bank_forks,
Bank::get_program_accounts_modified_since_parent,
notify_program,
);
}
pub fn check_signature(&self, signature: &Signature, bank_error: &transaction::Result<()>) {
pub fn check_signature(
&self,
signature: &Signature,
current_slot: u64,
bank_forks: &Arc<RwLock<BankForks>>,
) {
let mut subscriptions = self.signature_subscriptions.write().unwrap();
if let Some(hashmap) = subscriptions.get(signature) {
for (_bank_sub_id, sink) in hashmap.iter() {
sink.notify(Ok(Some(bank_error.clone()))).wait().unwrap();
}
}
check_confirmations_and_notify(
&subscriptions,
signature,
current_slot,
bank_forks,
Bank::get_signature_status,
notify_signature,
);
subscriptions.remove(&signature);
}
pub fn add_account_subscription(
&self,
pubkey: &Pubkey,
confirmations: Option<Confirmations>,
sub_id: &SubscriptionId,
sink: &Sink<Account>,
) {
let mut subscriptions = self.account_subscriptions.write().unwrap();
add_subscription(&mut subscriptions, pubkey, sub_id, sink);
add_subscription(&mut subscriptions, pubkey, confirmations, sub_id, sink);
}
pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool {
@ -123,11 +238,12 @@ impl RpcSubscriptions {
pub fn add_program_subscription(
&self,
program_id: &Pubkey,
confirmations: Option<Confirmations>,
sub_id: &SubscriptionId,
sink: &Sink<(String, Account)>,
) {
let mut subscriptions = self.program_subscriptions.write().unwrap();
add_subscription(&mut subscriptions, program_id, sub_id, sink);
add_subscription(&mut subscriptions, program_id, confirmations, sub_id, sink);
}
pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool {
@ -138,11 +254,12 @@ impl RpcSubscriptions {
pub fn add_signature_subscription(
&self,
signature: &Signature,
confirmations: Option<Confirmations>,
sub_id: &SubscriptionId,
sink: &Sink<Option<transaction::Result<()>>>,
sink: &Sink<transaction::Result<()>>,
) {
let mut subscriptions = self.signature_subscriptions.write().unwrap();
add_subscription(&mut subscriptions, signature, sub_id, sink);
add_subscription(&mut subscriptions, signature, confirmations, sub_id, sink);
}
pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool {
@ -152,15 +269,13 @@ impl RpcSubscriptions {
/// Notify subscribers of changes to any accounts or new signatures since
/// the bank's last checkpoint.
pub fn notify_subscribers(&self, bank: &Bank) {
pub fn notify_subscribers(&self, current_slot: u64, bank_forks: &Arc<RwLock<BankForks>>) {
let pubkeys: Vec<_> = {
let subs = self.account_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for pubkey in &pubkeys {
if let Some(account) = &bank.get_account_modified_since_parent(pubkey) {
self.check_account(pubkey, account);
}
self.check_account(pubkey, current_slot, bank_forks);
}
let programs: Vec<_> = {
@ -168,10 +283,7 @@ impl RpcSubscriptions {
subs.keys().cloned().collect()
};
for program_id in &programs {
let accounts = &bank.get_program_accounts_modified_since_parent(program_id);
for (pubkey, account) in accounts.iter() {
self.check_program(program_id, pubkey, account);
}
self.check_program(program_id, current_slot, bank_forks);
}
let signatures: Vec<_> = {
@ -179,8 +291,7 @@ impl RpcSubscriptions {
subs.keys().cloned().collect()
};
for signature in &signatures {
let status = bank.get_signature_status(signature).unwrap();
self.check_signature(signature, &status);
self.check_signature(signature, current_slot, bank_forks);
}
}
}
@ -199,8 +310,9 @@ mod tests {
fn test_check_account_subscribe() {
let (genesis_block, mint_keypair) = GenesisBlock::new(100);
let bank = Bank::new(&genesis_block);
let alice = Keypair::new();
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let alice = Keypair::new();
let tx = system_transaction::create_account(
&mint_keypair,
&alice.pubkey(),
@ -210,14 +322,20 @@ mod tests {
&solana_budget_api::id(),
0,
);
bank.process_transaction(&tx).unwrap();
bank_forks
.write()
.unwrap()
.get(0)
.unwrap()
.process_transaction(&tx)
.unwrap();
let (subscriber, _id_receiver, mut transport_receiver) =
Subscriber::new_test("accountNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default();
subscriptions.add_account_subscription(&alice.pubkey(), &sub_id, &sink);
subscriptions.add_account_subscription(&alice.pubkey(), None, &sub_id, &sink);
assert!(subscriptions
.account_subscriptions
@ -225,8 +343,7 @@ mod tests {
.unwrap()
.contains_key(&alice.pubkey()));
let account = bank.get_account(&alice.pubkey()).unwrap();
subscriptions.check_account(&alice.pubkey(), &account);
subscriptions.check_account(&alice.pubkey(), 0, &bank_forks);
let string = transport_receiver.poll();
if let Async::Ready(Some(response)) = string.unwrap() {
let expected = format!(r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"data":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"executable":false,"lamports":1,"owner":[2,203,81,223,225,24,34,35,203,214,138,130,144,208,35,77,63,16,87,51,47,198,115,123,98,188,19,160,0,0,0,0]}},"subscription":0}}}}"#);
@ -245,8 +362,9 @@ mod tests {
fn test_check_program_subscribe() {
let (genesis_block, mint_keypair) = GenesisBlock::new(100);
let bank = Bank::new(&genesis_block);
let alice = Keypair::new();
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let alice = Keypair::new();
let tx = system_transaction::create_account(
&mint_keypair,
&alice.pubkey(),
@ -256,14 +374,20 @@ mod tests {
&solana_budget_api::id(),
0,
);
bank.process_transaction(&tx).unwrap();
bank_forks
.write()
.unwrap()
.get(0)
.unwrap()
.process_transaction(&tx)
.unwrap();
let (subscriber, _id_receiver, mut transport_receiver) =
Subscriber::new_test("programNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default();
subscriptions.add_program_subscription(&solana_budget_api::id(), &sub_id, &sink);
subscriptions.add_program_subscription(&solana_budget_api::id(), None, &sub_id, &sink);
assert!(subscriptions
.program_subscriptions
@ -271,8 +395,7 @@ mod tests {
.unwrap()
.contains_key(&solana_budget_api::id()));
let account = bank.get_account(&alice.pubkey()).unwrap();
subscriptions.check_program(&solana_budget_api::id(), &alice.pubkey(), &account);
subscriptions.check_program(&solana_budget_api::id(), 0, &bank_forks);
let string = transport_receiver.poll();
if let Async::Ready(Some(response)) = string.unwrap() {
let expected = format!(r#"{{"jsonrpc":"2.0","method":"programNotification","params":{{"result":["{:?}",{{"data":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"executable":false,"lamports":1,"owner":[2,203,81,223,225,24,34,35,203,214,138,130,144,208,35,77,63,16,87,51,47,198,115,123,98,188,19,160,0,0,0,0]}}],"subscription":0}}}}"#, alice.pubkey());
@ -290,18 +413,25 @@ mod tests {
fn test_check_signature_subscribe() {
let (genesis_block, mint_keypair) = GenesisBlock::new(100);
let bank = Bank::new(&genesis_block);
let alice = Keypair::new();
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let alice = Keypair::new();
let tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 20, blockhash, 0);
let signature = tx.signatures[0];
bank.process_transaction(&tx).unwrap();
bank_forks
.write()
.unwrap()
.get(0)
.unwrap()
.process_transaction(&tx)
.unwrap();
let (subscriber, _id_receiver, mut transport_receiver) =
Subscriber::new_test("signatureNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default();
subscriptions.add_signature_subscription(&signature, &sub_id, &sink);
subscriptions.add_signature_subscription(&signature, None, &sub_id, &sink);
assert!(subscriptions
.signature_subscriptions
@ -309,7 +439,7 @@ mod tests {
.unwrap()
.contains_key(&signature));
subscriptions.check_signature(&signature, &Ok(()));
subscriptions.check_signature(&signature, 0, &bank_forks);
let string = transport_receiver.poll();
if let Async::Ready(Some(response)) = string.unwrap() {
let expected_res: Option<transaction::Result<()>> = Some(Ok(()));

View File

@ -163,7 +163,9 @@ impl Accounts {
let mut called_accounts: Vec<Account> = vec![];
for key in &message.account_keys {
called_accounts.push(
AccountsDB::load(storage, ancestors, accounts_index, key).unwrap_or_default(),
AccountsDB::load(storage, ancestors, accounts_index, key)
.map(|(account, _)| account)
.unwrap_or_default(),
);
}
if called_accounts.is_empty() || called_accounts[0].lamports == 0 {
@ -201,7 +203,9 @@ impl Accounts {
}
depth += 1;
let program = match AccountsDB::load(storage, ancestors, accounts_index, &program_id) {
let program = match AccountsDB::load(storage, ancestors, accounts_index, &program_id)
.map(|(account, _)| account)
{
Some(program) => program,
None => {
error_counters.account_not_found += 1;
@ -289,10 +293,14 @@ impl Accounts {
}
/// Slow because lock is held for 1 operation instead of many
pub fn load_slow(&self, ancestors: &HashMap<Fork, usize>, pubkey: &Pubkey) -> Option<Account> {
pub fn load_slow(
&self,
ancestors: &HashMap<Fork, usize>,
pubkey: &Pubkey,
) -> Option<(Account, Fork)> {
self.accounts_db
.load_slow(ancestors, pubkey)
.filter(|acc| acc.lamports != 0)
.filter(|(acc, _)| acc.lamports != 0)
}
pub fn load_by_program(&self, fork: Fork, program_id: &Pubkey) -> Vec<(Pubkey, Account)> {

View File

@ -259,15 +259,20 @@ impl AccountsDB {
ancestors: &HashMap<Fork, usize>,
accounts_index: &AccountsIndex<AccountInfo>,
pubkey: &Pubkey,
) -> Option<Account> {
let info = accounts_index.get(pubkey, ancestors)?;
) -> Option<(Account, Fork)> {
let (info, fork) = accounts_index.get(pubkey, ancestors)?;
//TODO: thread this as a ref
storage
.get(&info.id)
.and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account()))
.map(|account| (account, fork))
}
pub fn load_slow(&self, ancestors: &HashMap<Fork, usize>, pubkey: &Pubkey) -> Option<Account> {
pub fn load_slow(
&self,
ancestors: &HashMap<Fork, usize>,
pubkey: &Pubkey,
) -> Option<(Account, Fork)> {
let accounts_index = self.accounts_index.read().unwrap();
let storage = self.storage.read().unwrap();
Self::load(&storage, ancestors, &accounts_index, pubkey)
@ -480,7 +485,7 @@ mod tests {
db.store(0, &[(&key, &account0)]);
db.add_root(0);
let ancestors = vec![(1, 1)].into_iter().collect();
assert_eq!(db.load_slow(&ancestors, &key), Some(account0));
assert_eq!(db.load_slow(&ancestors, &key), Some((account0, 0)));
}
#[test]
@ -497,10 +502,10 @@ mod tests {
db.store(1, &[(&key, &account1)]);
let ancestors = vec![(1, 1)].into_iter().collect();
assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1);
assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1);
let ancestors = vec![(1, 1), (0, 0)].into_iter().collect();
assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1);
assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1);
}
#[test]
@ -518,10 +523,10 @@ mod tests {
db.add_root(0);
let ancestors = vec![(1, 1)].into_iter().collect();
assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1);
assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1);
let ancestors = vec![(1, 1), (0, 0)].into_iter().collect();
assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1);
assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1);
}
#[test]
@ -552,18 +557,18 @@ mod tests {
// original account (but could also accept "None", which is implemented
// at the Accounts level)
let ancestors = vec![(0, 0), (1, 1)].into_iter().collect();
assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1);
assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1);
// we should see 1 token in fork 2
let ancestors = vec![(0, 0), (2, 2)].into_iter().collect();
assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account0);
assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account0);
db.add_root(0);
let ancestors = vec![(1, 1)].into_iter().collect();
assert_eq!(db.load_slow(&ancestors, &key), Some(account1));
assert_eq!(db.load_slow(&ancestors, &key), Some((account1, 1)));
let ancestors = vec![(2, 2)].into_iter().collect();
assert_eq!(db.load_slow(&ancestors, &key), Some(account0)); // original value
assert_eq!(db.load_slow(&ancestors, &key), Some((account0, 0))); // original value
}
#[test]
@ -579,7 +584,7 @@ mod tests {
let account = db.load_slow(&ancestors, &pubkeys[idx]).unwrap();
let mut default_account = Account::default();
default_account.lamports = (idx + 1) as u64;
assert_eq!(default_account, account);
assert_eq!((default_account, 0), account);
}
db.add_root(0);
@ -593,8 +598,8 @@ mod tests {
let account1 = db.load_slow(&ancestors, &pubkeys[idx]).unwrap();
let mut default_account = Account::default();
default_account.lamports = (idx + 1) as u64;
assert_eq!(&default_account, &account0);
assert_eq!(&default_account, &account1);
assert_eq!(&default_account, &account0.0);
assert_eq!(&default_account, &account1.0);
}
}
@ -650,9 +655,9 @@ mod tests {
// masking accounts is done at the Accounts level, at accountsDB we see
// original account
let ancestors = vec![(0, 0), (1, 1)].into_iter().collect();
assert_eq!(db0.load_slow(&ancestors, &key), Some(account1));
assert_eq!(db0.load_slow(&ancestors, &key), Some((account1, 1)));
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(db0.load_slow(&ancestors, &key), Some(account0));
assert_eq!(db0.load_slow(&ancestors, &key), Some((account0, 0)));
}
fn create_account(
@ -685,7 +690,7 @@ mod tests {
for _ in 1..1000 {
let idx = thread_rng().gen_range(0, range);
let ancestors = vec![(fork, 0)].into_iter().collect();
if let Some(mut account) = accounts.load_slow(&ancestors, &pubkeys[idx]) {
if let Some((mut account, _)) = accounts.load_slow(&ancestors, &pubkeys[idx]) {
account.lamports = account.lamports + 1;
accounts.store(fork, &[(&pubkeys[idx], &account)]);
if account.lamports == 0 {
@ -714,7 +719,7 @@ mod tests {
let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap();
let mut default_account = Account::default();
default_account.lamports = (idx + 1) as u64;
assert_eq!(default_account, account);
assert_eq!((default_account, 0), account);
}
}
@ -728,7 +733,7 @@ mod tests {
let account = accounts.load_slow(&ancestors, &pubkeys[0]).unwrap();
let mut default_account = Account::default();
default_account.lamports = 1;
assert_eq!(default_account, account);
assert_eq!((default_account, 0), account);
}
#[test]
@ -765,7 +770,7 @@ mod tests {
for (i, key) in keys.iter().enumerate() {
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(
accounts.load_slow(&ancestors, &key).unwrap().lamports,
accounts.load_slow(&ancestors, &key).unwrap().0.lamports,
(i as u64) + 1
);
}
@ -810,8 +815,14 @@ mod tests {
assert_eq!(stores[&1].status(), AccountStorageStatus::StorageAvailable);
}
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1);
assert_eq!(accounts.load_slow(&ancestors, &pubkey2).unwrap(), account2);
assert_eq!(
accounts.load_slow(&ancestors, &pubkey1).unwrap().0,
account1
);
assert_eq!(
accounts.load_slow(&ancestors, &pubkey2).unwrap().0,
account2
);
// lots of stores, but 3 storages should be enough for everything
for i in 0..25 {
@ -828,8 +839,14 @@ mod tests {
assert_eq!(stores[&2].status(), status[0]);
}
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1);
assert_eq!(accounts.load_slow(&ancestors, &pubkey2).unwrap(), account2);
assert_eq!(
accounts.load_slow(&ancestors, &pubkey1).unwrap().0,
account1
);
assert_eq!(
accounts.load_slow(&ancestors, &pubkey2).unwrap().0,
account2
);
}
}
@ -875,6 +892,7 @@ mod tests {
.unwrap()
.get(&pubkey, &ancestors)
.unwrap()
.0
.clone();
//fork 0 is behind root, but it is not root, therefore it is purged
accounts.add_root(1);
@ -891,7 +909,7 @@ mod tests {
//new value is there
let ancestors = vec![(1, 1)].into_iter().collect();
assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some(account));;
assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1)));
}
}

View File

@ -15,21 +15,21 @@ pub struct AccountsIndex<T> {
impl<T: Clone> AccountsIndex<T> {
/// Get an account
/// The latest account that appears in `ancestors` or `roots` is returned.
pub fn get(&self, pubkey: &Pubkey, ancestors: &HashMap<Fork, usize>) -> Option<&T> {
pub fn get(&self, pubkey: &Pubkey, ancestors: &HashMap<Fork, usize>) -> Option<(&T, Fork)> {
let list = self.account_maps.get(pubkey)?;
let mut max = 0;
let mut rv = None;
for e in list.iter().rev() {
if e.0 >= max && (ancestors.get(&e.0).is_some() || self.is_root(e.0)) {
trace!("GET {} {:?}", e.0, ancestors);
rv = Some(&e.1);
rv = Some((&e.1, e.0));
max = e.0;
}
}
rv
}
/// Insert a new fork.
/// Insert a new fork.
/// @retval - The return value contains any squashed accounts that can freed from storage.
pub fn insert(&mut self, fork: Fork, pubkey: &Pubkey, account_info: T) -> Vec<(Fork, T)> {
let mut rv = vec![];
@ -123,7 +123,7 @@ mod tests {
assert!(gc.is_empty());
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&true));
assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 0)));
}
#[test]
@ -143,7 +143,7 @@ mod tests {
let ancestors = vec![].into_iter().collect();
index.add_root(0);
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&true));
assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 0)));
}
#[test]
@ -199,11 +199,11 @@ mod tests {
let ancestors = vec![(0, 0)].into_iter().collect();
let gc = index.insert(0, &key.pubkey(), true);
assert!(gc.is_empty());
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&true));
assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 0)));
let gc = index.insert(0, &key.pubkey(), false);
assert_eq!(gc, vec![(0, true)]);
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&false));
assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&false, 0)));
}
#[test]
@ -215,9 +215,9 @@ mod tests {
assert!(gc.is_empty());
let gc = index.insert(1, &key.pubkey(), false);
assert!(gc.is_empty());
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&true));
assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 0)));
let ancestors = vec![(1, 0)].into_iter().collect();
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&false));
assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&false, 1)));
}
#[test]
@ -230,6 +230,6 @@ mod tests {
let gc = index.insert(1, &key.pubkey(), false);
assert_eq!(gc, vec![(0, true)]);
let ancestors = vec![].into_iter().collect();
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&false));
assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&false, 1)));
}
}

View File

@ -5,6 +5,7 @@
use crate::accounts::Accounts;
use crate::accounts_db::{ErrorCounters, InstructionAccounts, InstructionLoaders};
use crate::accounts_index::Fork;
use crate::blockhash_queue::BlockhashQueue;
use crate::locked_accounts_results::LockedAccountsResults;
use crate::message_processor::{MessageProcessor, ProcessInstruction};
@ -888,7 +889,9 @@ impl Bank {
}
pub fn get_account(&self, pubkey: &Pubkey) -> Option<Account> {
self.accounts.load_slow(&self.ancestors, pubkey)
self.accounts
.load_slow(&self.ancestors, pubkey)
.map(|(account, _)| account)
}
pub fn get_program_accounts_modified_since_parent(
@ -898,7 +901,7 @@ impl Bank {
self.accounts.load_by_program(self.slot(), program_id)
}
pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option<Account> {
pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option<(Account, Fork)> {
let just_self: HashMap<u64, usize> = vec![(self.slot(), 0)].into_iter().collect();
self.accounts.load_slow(&just_self, pubkey)
}