Retain signature subscriptions that haven't been notified (#8261)

This commit is contained in:
Justin Starry 2020-02-14 01:00:50 +08:00 committed by GitHub
parent bd257050e3
commit 1c97b31eaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 54 additions and 17 deletions

View File

@ -17,7 +17,7 @@ use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender};
use std::thread::{Builder, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use std::{ use std::{
collections::HashMap, collections::{HashMap, HashSet},
sync::{Arc, Mutex, RwLock}, sync::{Arc, Mutex, RwLock},
}; };
@ -113,11 +113,12 @@ fn check_confirmations_and_notify<K, S, F, N, X>(
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
bank_method: F, bank_method: F,
notify: N, notify: N,
) where ) -> HashSet<SubscriptionId>
where
K: Eq + Hash + Clone + Copy, K: Eq + Hash + Clone + Copy,
S: Clone + Serialize, S: Clone + Serialize,
F: Fn(&Bank, &K) -> X, F: Fn(&Bank, &K) -> X,
N: Fn(X, &Sink<S>, u64), N: Fn(X, &Sink<S>, u64) -> bool,
X: Clone + Serialize, X: Clone + Serialize,
{ {
let current_ancestors = bank_forks let current_ancestors = bank_forks
@ -127,8 +128,10 @@ fn check_confirmations_and_notify<K, S, F, N, X>(
.unwrap() .unwrap()
.ancestors .ancestors
.clone(); .clone();
let mut notified_set: HashSet<SubscriptionId> = HashSet::new();
if let Some(hashmap) = subscriptions.get(hashmap_key) { if let Some(hashmap) = subscriptions.get(hashmap_key) {
for (_bank_sub_id, (sink, confirmations)) in hashmap.iter() { for (bank_sub_id, (sink, confirmations)) in hashmap.iter() {
let desired_slot: Vec<u64> = current_ancestors let desired_slot: Vec<u64> = current_ancestors
.iter() .iter()
.filter(|(_, &v)| v == *confirmations) .filter(|(_, &v)| v == *confirmations)
@ -150,30 +153,41 @@ fn check_confirmations_and_notify<K, S, F, N, X>(
.unwrap() .unwrap()
.clone(); .clone();
let result = bank_method(&desired_bank, hashmap_key); let result = bank_method(&desired_bank, hashmap_key);
notify(result, &sink, root); if notify(result, &sink, root) {
notified_set.insert(bank_sub_id.clone());
} }
} }
} }
}
notified_set
} }
fn notify_account(result: Option<(Account, Slot)>, sink: &Sink<RpcAccount>, root: Slot) { fn notify_account(result: Option<(Account, Slot)>, sink: &Sink<RpcAccount>, root: Slot) -> bool {
if let Some((account, fork)) = result { if let Some((account, fork)) = result {
if fork >= root { if fork >= root {
sink.notify(Ok(RpcAccount::encode(account))).wait().unwrap(); sink.notify(Ok(RpcAccount::encode(account))).wait().unwrap();
return true;
} }
} }
false
} }
fn notify_signature<S>(result: Option<S>, sink: &Sink<S>, _root: Slot) fn notify_signature<S>(result: Option<S>, sink: &Sink<S>, _root: Slot) -> bool
where where
S: Clone + Serialize, S: Clone + Serialize,
{ {
if let Some(result) = result { if let Some(result) = result {
sink.notify(Ok(result)).wait().unwrap(); sink.notify(Ok(result)).wait().unwrap();
return true;
} }
false
} }
fn notify_program(accounts: Vec<(Pubkey, Account)>, sink: &Sink<RpcKeyedAccount>, _root: Slot) { fn notify_program(
accounts: Vec<(Pubkey, Account)>,
sink: &Sink<RpcKeyedAccount>,
_root: Slot,
) -> bool {
for (pubkey, account) in accounts.iter() { for (pubkey, account) in accounts.iter() {
sink.notify(Ok(RpcKeyedAccount { sink.notify(Ok(RpcKeyedAccount {
pubkey: pubkey.to_string(), pubkey: pubkey.to_string(),
@ -182,6 +196,7 @@ fn notify_program(accounts: Vec<(Pubkey, Account)>, sink: &Sink<RpcKeyedAccount>
.wait() .wait()
.unwrap(); .unwrap();
} }
!accounts.is_empty()
} }
pub struct RpcSubscriptions { pub struct RpcSubscriptions {
@ -293,7 +308,7 @@ impl RpcSubscriptions {
signature_subscriptions: Arc<RpcSignatureSubscriptions>, signature_subscriptions: Arc<RpcSignatureSubscriptions>,
) { ) {
let mut subscriptions = signature_subscriptions.write().unwrap(); let mut subscriptions = signature_subscriptions.write().unwrap();
check_confirmations_and_notify( let notified_ids = check_confirmations_and_notify(
&subscriptions, &subscriptions,
signature, signature,
current_slot, current_slot,
@ -301,8 +316,13 @@ impl RpcSubscriptions {
Bank::get_signature_status, Bank::get_signature_status,
notify_signature, notify_signature,
); );
if let Some(subscription_ids) = subscriptions.get_mut(signature) {
subscription_ids.retain(|k, _| !notified_ids.contains(k));
if subscription_ids.is_empty() {
subscriptions.remove(&signature); subscriptions.remove(&signature);
} }
}
}
pub fn add_account_subscription( pub fn add_account_subscription(
&self, &self,
@ -637,6 +657,9 @@ pub(crate) mod tests {
let alice = Keypair::new(); let alice = Keypair::new();
let tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 20, blockhash); let tx = system_transaction::transfer(&mint_keypair, &alice.pubkey(), 20, blockhash);
let signature = tx.signatures[0]; let signature = tx.signatures[0];
let unprocessed_tx =
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 10, blockhash);
let not_ready_signature = unprocessed_tx.signatures[0];
bank_forks bank_forks
.write() .write()
.unwrap() .unwrap()
@ -648,16 +671,23 @@ pub(crate) mod tests {
let (subscriber, _id_receiver, transport_receiver) = let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("signatureNotification"); Subscriber::new_test("signatureNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sub_id = SubscriptionId::Number(0 as u64);
let remaining_sub_id = SubscriptionId::Number(1 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit); let subscriptions = RpcSubscriptions::new(&exit);
subscriptions.add_signature_subscription(&signature, None, &sub_id, &sink); subscriptions.add_signature_subscription(&signature, None, &sub_id, &sink.clone());
subscriptions.add_signature_subscription(
&not_ready_signature,
None,
&remaining_sub_id,
&sink.clone(),
);
assert!(subscriptions {
.signature_subscriptions let sig_subs = subscriptions.signature_subscriptions.read().unwrap();
.read() assert!(sig_subs.contains_key(&signature));
.unwrap() assert!(sig_subs.contains_key(&not_ready_signature));
.contains_key(&signature)); }
subscriptions.notify_subscribers(0, &bank_forks); subscriptions.notify_subscribers(0, &bank_forks);
let response = robust_poll_or_panic(transport_receiver); let response = robust_poll_or_panic(transport_receiver);
@ -670,12 +700,19 @@ pub(crate) mod tests {
); );
assert_eq!(expected, response); assert_eq!(expected, response);
subscriptions.remove_signature_subscription(&sub_id); // Subscription should be automatically removed after notification
assert!(!subscriptions assert!(!subscriptions
.signature_subscriptions .signature_subscriptions
.read() .read()
.unwrap() .unwrap()
.contains_key(&signature)); .contains_key(&signature));
// Unprocessed signature subscription should not be removed
assert!(subscriptions
.signature_subscriptions
.read()
.unwrap()
.contains_key(&not_ready_signature));
} }
#[test] #[test]