feat: implement RPC notification queue (#7863)

This commit is contained in:
Sunny Gleason 2020-01-20 16:08:29 -05:00 committed by GitHub
parent cc299053cc
commit 5cf090c896
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 325 additions and 167 deletions

View File

@ -1311,7 +1311,8 @@ pub(crate) mod tests {
let genesis_config = create_genesis_config(10_000).genesis_config; let genesis_config = create_genesis_config(10_000).genesis_config;
let bank0 = Bank::new(&genesis_config); let bank0 = Bank::new(&genesis_config);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0));
let subscriptions = Arc::new(RpcSubscriptions::default()); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = Arc::new(RpcSubscriptions::new(&exit));
let bank_forks = BankForks::new(0, bank0); let bank_forks = BankForks::new(0, bank0);
bank_forks.working_bank().freeze(); bank_forks.working_bank().freeze();

View File

@ -280,6 +280,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
mod tests { mod tests {
use super::*; use super::*;
use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use crate::rpc_subscriptions::tests::robust_poll_or_panic;
use jsonrpc_core::{futures::sync::mpsc, Response}; use jsonrpc_core::{futures::sync::mpsc, Response};
use jsonrpc_pubsub::{PubSubHandler, Session}; use jsonrpc_pubsub::{PubSubHandler, Session};
use solana_budget_program::{self, budget_instruction}; use solana_budget_program::{self, budget_instruction};
@ -292,7 +293,6 @@ mod tests {
transaction::{self, Transaction}, transaction::{self, Transaction},
}; };
use std::{sync::RwLock, thread::sleep, time::Duration}; use std::{sync::RwLock, thread::sleep, time::Duration};
use tokio::prelude::{Async, Stream};
fn process_transaction_and_notify( fn process_transaction_and_notify(
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
@ -332,25 +332,21 @@ mod tests {
let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash); let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash);
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, mut receiver) = let (subscriber, _id_receiver, receiver) = Subscriber::new_test("signatureNotification");
Subscriber::new_test("signatureNotification");
rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string(), None); rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string(), None);
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap();
sleep(Duration::from_millis(200));
// Test signature confirmation notification // Test signature confirmation notification
let string = receiver.poll(); let response = robust_poll_or_panic(receiver);
if let Async::Ready(Some(response)) = string.unwrap() { let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
let expected_res: Option<transaction::Result<()>> = Some(Ok(())); let expected_res_str =
let expected_res_str = serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap();
serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); let expected = format!(
let expected = format!( r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#,
r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str
expected_res_str );
); assert_eq!(expected, response);
assert_eq!(expected, response);
}
} }
#[test] #[test]
@ -425,7 +421,7 @@ mod tests {
let rpc = RpcSolPubSubImpl::default(); let rpc = RpcSolPubSubImpl::default();
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification");
rpc.account_subscribe( rpc.account_subscribe(
session, session,
subscriber, subscriber,
@ -453,7 +449,6 @@ mod tests {
sleep(Duration::from_millis(200)); sleep(Duration::from_millis(200));
// Test signature confirmation notification #1 // Test signature confirmation notification #1
let string = receiver.poll();
let expected_data = bank_forks let expected_data = bank_forks
.read() .read()
.unwrap() .unwrap()
@ -477,9 +472,8 @@ mod tests {
} }
}); });
if let Async::Ready(Some(response)) = string.unwrap() { let response = robust_poll_or_panic(receiver);
assert_eq!(serde_json::to_string(&expected).unwrap(), response); assert_eq!(serde_json::to_string(&expected).unwrap(), response);
}
let tx = system_transaction::transfer(&alice, &witness.pubkey(), 1, blockhash); let tx = system_transaction::transfer(&alice, &witness.pubkey(), 1, blockhash);
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap();
@ -558,7 +552,7 @@ mod tests {
let rpc = RpcSolPubSubImpl::default(); let rpc = RpcSolPubSubImpl::default();
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification");
rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2));
let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash); let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash);
@ -570,7 +564,9 @@ mod tests {
.process_transaction(&tx) .process_transaction(&tx)
.unwrap(); .unwrap();
rpc.subscriptions.notify_subscribers(0, &bank_forks); rpc.subscriptions.notify_subscribers(0, &bank_forks);
let _panic = receiver.poll(); // allow 200ms for notification thread to wake
std::thread::sleep(Duration::from_millis(200));
let _panic = robust_poll_or_panic(receiver);
} }
#[test] #[test]
@ -587,7 +583,7 @@ mod tests {
let rpc = RpcSolPubSubImpl::default(); let rpc = RpcSolPubSubImpl::default();
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification");
rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2));
let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash); let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash);
@ -608,7 +604,6 @@ mod tests {
let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2);
bank_forks.write().unwrap().insert(bank2); bank_forks.write().unwrap().insert(bank2);
rpc.subscriptions.notify_subscribers(2, &bank_forks); rpc.subscriptions.notify_subscribers(2, &bank_forks);
let string = receiver.poll();
let expected = json!({ let expected = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "accountNotification", "method": "accountNotification",
@ -623,61 +618,54 @@ mod tests {
"subscription": 0, "subscription": 0,
} }
}); });
if let Async::Ready(Some(response)) = string.unwrap() { let response = robust_poll_or_panic(receiver);
assert_eq!(serde_json::to_string(&expected).unwrap(), response); assert_eq!(serde_json::to_string(&expected).unwrap(), response);
}
} }
#[test] #[test]
fn test_slot_subscribe() { fn test_slot_subscribe() {
let rpc = RpcSolPubSubImpl::default(); let rpc = RpcSolPubSubImpl::default();
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("slotNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification");
rpc.slot_subscribe(session, subscriber); rpc.slot_subscribe(session, subscriber);
rpc.subscriptions.notify_slot(0, 0, 0); rpc.subscriptions.notify_slot(0, 0, 0);
// Test slot confirmation notification // Test slot confirmation notification
let string = receiver.poll(); let response = robust_poll_or_panic(receiver);
if let Async::Ready(Some(response)) = string.unwrap() { let expected_res = SlotInfo {
let expected_res = SlotInfo { parent: 0,
parent: 0, slot: 0,
slot: 0, root: 0,
root: 0, };
}; let expected_res_str =
let expected_res_str = serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap();
serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); let expected = format!(
let expected = format!( r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#,
r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str
expected_res_str );
); assert_eq!(expected, response);
assert_eq!(expected, response);
}
} }
#[test] #[test]
fn test_slot_unsubscribe() { fn test_slot_unsubscribe() {
let rpc = RpcSolPubSubImpl::default(); let rpc = RpcSolPubSubImpl::default();
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("slotNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification");
rpc.slot_subscribe(session, subscriber); rpc.slot_subscribe(session, subscriber);
rpc.subscriptions.notify_slot(0, 0, 0); rpc.subscriptions.notify_slot(0, 0, 0);
let response = robust_poll_or_panic(receiver);
let string = receiver.poll(); let expected_res = SlotInfo {
if let Async::Ready(Some(response)) = string.unwrap() { parent: 0,
let expected_res = SlotInfo { slot: 0,
parent: 0, root: 0,
slot: 0, };
root: 0, let expected_res_str =
}; serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap();
let expected_res_str = let expected = format!(
serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#,
let expected = format!( expected_res_str
r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, );
expected_res_str assert_eq!(expected, response);
);
assert_eq!(expected, response);
}
let session = create_session(); let session = create_session();
assert!(rpc assert!(rpc

View File

@ -68,9 +68,9 @@ mod tests {
#[test] #[test]
fn test_pubsub_new() { fn test_pubsub_new() {
let subscriptions = Arc::new(RpcSubscriptions::default());
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = Arc::new(RpcSubscriptions::new(&exit));
let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit);
let thread = pubsub_service.thread_hdl.thread(); let thread = pubsub_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-pubsub"); assert_eq!(thread.name().unwrap(), "solana-pubsub");

View File

@ -11,20 +11,45 @@ use solana_sdk::{
account::Account, clock::Slot, pubkey::Pubkey, signature::Signature, transaction, account::Account, clock::Slot, pubkey::Pubkey, signature::Signature, transaction,
}; };
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
use std::ops::DerefMut;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender};
use std::thread::{Builder, JoinHandle};
use std::time::Duration;
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{Arc, RwLock}, sync::{Arc, Mutex, RwLock},
}; };
const RECEIVE_DELAY_MILLIS: u64 = 100;
pub type Confirmations = usize; pub type Confirmations = usize;
#[derive(Serialize, Clone)] #[derive(Serialize, Clone, Copy, Debug)]
pub struct SlotInfo { pub struct SlotInfo {
pub slot: Slot, pub slot: Slot,
pub parent: Slot, pub parent: Slot,
pub root: Slot, pub root: Slot,
} }
enum NotificationEntry {
Slot(SlotInfo),
Bank((Slot, Arc<RwLock<BankForks>>)),
}
impl std::fmt::Debug for NotificationEntry {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info),
NotificationEntry::Bank((current_slot, _)) => {
write!(f, "Bank({{current_slot: {:?}}})", current_slot)
}
}
}
}
type NotificationSend = Arc<Mutex<NotificationEntry>>;
type RpcAccountSubscriptions = type RpcAccountSubscriptions =
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<RpcAccount>, Confirmations)>>>; RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<RpcAccount>, Confirmations)>>>;
type RpcProgramSubscriptions = type RpcProgramSubscriptions =
@ -159,31 +184,80 @@ fn notify_program(accounts: Vec<(Pubkey, Account)>, sink: &Sink<RpcKeyedAccount>
} }
pub struct RpcSubscriptions { pub struct RpcSubscriptions {
account_subscriptions: RpcAccountSubscriptions, account_subscriptions: Arc<RpcAccountSubscriptions>,
program_subscriptions: RpcProgramSubscriptions, program_subscriptions: Arc<RpcProgramSubscriptions>,
signature_subscriptions: RpcSignatureSubscriptions, signature_subscriptions: Arc<RpcSignatureSubscriptions>,
slot_subscriptions: RpcSlotSubscriptions, slot_subscriptions: Arc<RpcSlotSubscriptions>,
notification_sender: Arc<Mutex<Sender<Arc<Mutex<NotificationEntry>>>>>,
t_cleanup: Option<JoinHandle<()>>,
exit: Arc<AtomicBool>,
} }
impl Default for RpcSubscriptions { impl Default for RpcSubscriptions {
fn default() -> Self { fn default() -> Self {
RpcSubscriptions { Self::new(&Arc::new(AtomicBool::new(false)))
account_subscriptions: RpcAccountSubscriptions::default(), }
program_subscriptions: RpcProgramSubscriptions::default(), }
signature_subscriptions: RpcSignatureSubscriptions::default(),
slot_subscriptions: RpcSlotSubscriptions::default(), impl Drop for RpcSubscriptions {
} fn drop(&mut self) {
self.shutdown().unwrap_or_else(|err| {
warn!("RPC Notification - shutdown error: {:?}", err);
});
} }
} }
impl RpcSubscriptions { impl RpcSubscriptions {
pub fn check_account( pub fn new(exit: &Arc<AtomicBool>) -> Self {
&self, let (notification_sender, notification_receiver): (
Sender<NotificationSend>,
Receiver<NotificationSend>,
) = std::sync::mpsc::channel();
let account_subscriptions = Arc::new(RpcAccountSubscriptions::default());
let program_subscriptions = Arc::new(RpcProgramSubscriptions::default());
let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default());
let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default());
let notification_sender = Arc::new(Mutex::new(notification_sender));
let exit_clone = exit.clone();
let account_subscriptions_clone = account_subscriptions.clone();
let program_subscriptions_clone = program_subscriptions.clone();
let signature_subscriptions_clone = signature_subscriptions.clone();
let slot_subscriptions_clone = slot_subscriptions.clone();
let t_cleanup = Builder::new()
.name("solana-rpc-notifications".to_string())
.spawn(move || {
Self::process_notifications(
exit_clone,
notification_receiver,
account_subscriptions_clone,
program_subscriptions_clone,
signature_subscriptions_clone,
slot_subscriptions_clone,
);
})
.unwrap();
Self {
account_subscriptions,
program_subscriptions,
signature_subscriptions,
slot_subscriptions,
notification_sender,
t_cleanup: Some(t_cleanup),
exit: exit.clone(),
}
}
fn check_account(
pubkey: &Pubkey, pubkey: &Pubkey,
current_slot: Slot, current_slot: Slot,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
account_subscriptions: Arc<RpcAccountSubscriptions>,
) { ) {
let subscriptions = self.account_subscriptions.read().unwrap(); let subscriptions = account_subscriptions.read().unwrap();
check_confirmations_and_notify( check_confirmations_and_notify(
&subscriptions, &subscriptions,
pubkey, pubkey,
@ -194,13 +268,13 @@ impl RpcSubscriptions {
); );
} }
pub fn check_program( fn check_program(
&self,
program_id: &Pubkey, program_id: &Pubkey,
current_slot: Slot, current_slot: Slot,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
program_subscriptions: Arc<RpcProgramSubscriptions>,
) { ) {
let subscriptions = self.program_subscriptions.write().unwrap(); let subscriptions = program_subscriptions.read().unwrap();
check_confirmations_and_notify( check_confirmations_and_notify(
&subscriptions, &subscriptions,
program_id, program_id,
@ -211,13 +285,13 @@ impl RpcSubscriptions {
); );
} }
pub fn check_signature( fn check_signature(
&self,
signature: &Signature, signature: &Signature,
current_slot: Slot, current_slot: Slot,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
) { ) {
let mut subscriptions = self.signature_subscriptions.write().unwrap(); let mut subscriptions = signature_subscriptions.write().unwrap();
check_confirmations_and_notify( check_confirmations_and_notify(
&subscriptions, &subscriptions,
signature, signature,
@ -280,29 +354,7 @@ impl RpcSubscriptions {
/// Notify subscribers of changes to any accounts or new signatures since /// Notify subscribers of changes to any accounts or new signatures since
/// the bank's last checkpoint. /// the bank's last checkpoint.
pub fn notify_subscribers(&self, current_slot: Slot, bank_forks: &Arc<RwLock<BankForks>>) { pub fn notify_subscribers(&self, current_slot: Slot, bank_forks: &Arc<RwLock<BankForks>>) {
let pubkeys: Vec<_> = { self.enqueue_notification(NotificationEntry::Bank((current_slot, bank_forks.clone())));
let subs = self.account_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for pubkey in &pubkeys {
self.check_account(pubkey, current_slot, bank_forks);
}
let programs: Vec<_> = {
let subs = self.program_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for program_id in &programs {
self.check_program(program_id, current_slot, bank_forks);
}
let signatures: Vec<_> = {
let subs = self.signature_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for signature in &signatures {
self.check_signature(signature, current_slot, bank_forks);
}
} }
pub fn add_slot_subscription(&self, sub_id: &SubscriptionId, sink: &Sink<SlotInfo>) { pub fn add_slot_subscription(&self, sub_id: &SubscriptionId, sink: &Sink<SlotInfo>) {
@ -316,19 +368,120 @@ impl RpcSubscriptions {
} }
pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) { pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) {
let subscriptions = self.slot_subscriptions.read().unwrap(); self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root }));
for (_, sink) in subscriptions.iter() { }
sink.notify(Ok(SlotInfo { slot, parent, root }))
.wait() fn enqueue_notification(&self, notification_entry: NotificationEntry) {
.unwrap(); match self
.notification_sender
.lock()
.unwrap()
.send(Arc::new(Mutex::new(notification_entry)))
{
Ok(()) => (),
Err(SendError(notification)) => {
warn!(
"Dropped RPC Notification - receiver disconnected : {:?}",
notification
);
}
}
}
fn process_notifications(
exit: Arc<AtomicBool>,
notification_receiver: Receiver<Arc<Mutex<NotificationEntry>>>,
account_subscriptions: Arc<RpcAccountSubscriptions>,
program_subscriptions: Arc<RpcProgramSubscriptions>,
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
slot_subscriptions: Arc<RpcSlotSubscriptions>,
) {
loop {
if exit.load(Ordering::Relaxed) {
break;
}
match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) {
Ok(notification_entry) => {
let mut notification_entry = notification_entry.lock().unwrap();
match notification_entry.deref_mut() {
NotificationEntry::Slot(slot_info) => {
let subscriptions = slot_subscriptions.read().unwrap();
for (_, sink) in subscriptions.iter() {
sink.notify(Ok(*slot_info)).wait().unwrap();
}
}
NotificationEntry::Bank((current_slot, bank_forks)) => {
let pubkeys: Vec<_> = {
let subs = account_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for pubkey in &pubkeys {
Self::check_account(
pubkey,
*current_slot,
&bank_forks,
account_subscriptions.clone(),
);
}
let programs: Vec<_> = {
let subs = program_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for program_id in &programs {
Self::check_program(
program_id,
*current_slot,
&bank_forks,
program_subscriptions.clone(),
);
}
let signatures: Vec<_> = {
let subs = signature_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for signature in &signatures {
Self::check_signature(
signature,
*current_slot,
&bank_forks,
signature_subscriptions.clone(),
);
}
}
}
}
Err(RecvTimeoutError::Timeout) => {
// not a problem - try reading again
}
Err(RecvTimeoutError::Disconnected) => {
warn!("RPC Notification thread - sender disconnected");
break;
}
}
}
}
fn shutdown(&mut self) -> std::thread::Result<()> {
if self.t_cleanup.is_some() {
info!("RPC Notification thread - shutting down");
self.exit.store(true, Ordering::Relaxed);
let x = self.t_cleanup.take().unwrap().join();
info!("RPC Notification thread - shut down.");
x
} else {
warn!("RPC Notification thread - already shut down.");
Ok(())
} }
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { pub(crate) mod tests {
use super::*; use super::*;
use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use jsonrpc_core::futures;
use jsonrpc_pubsub::typed::Subscriber; use jsonrpc_pubsub::typed::Subscriber;
use solana_budget_program; use solana_budget_program;
use solana_sdk::{ use solana_sdk::{
@ -337,6 +490,26 @@ mod tests {
}; };
use tokio::prelude::{Async, Stream}; use tokio::prelude::{Async, Stream};
pub(crate) fn robust_poll<T>(
mut receiver: futures::sync::mpsc::Receiver<T>,
) -> Result<T, RecvTimeoutError> {
const INITIAL_DELAY_MS: u64 = RECEIVE_DELAY_MILLIS * 2;
std::thread::sleep(Duration::from_millis(INITIAL_DELAY_MS));
for _i in 0..5 {
let found = receiver.poll();
if let Ok(Async::Ready(Some(result))) = found {
return Ok(result);
}
std::thread::sleep(Duration::from_millis(RECEIVE_DELAY_MILLIS));
}
Err(RecvTimeoutError::Timeout)
}
pub(crate) fn robust_poll_or_panic<T>(receiver: futures::sync::mpsc::Receiver<T>) -> T {
robust_poll(receiver).unwrap_or_else(|err| panic!("expected response! {}", err))
}
#[test] #[test]
fn test_check_account_subscribe() { fn test_check_account_subscribe() {
let GenesisConfigInfo { let GenesisConfigInfo {
@ -364,11 +537,12 @@ mod tests {
.process_transaction(&tx) .process_transaction(&tx)
.unwrap(); .unwrap();
let (subscriber, _id_receiver, mut transport_receiver) = let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("accountNotification"); Subscriber::new_test("accountNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default(); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit);
subscriptions.add_account_subscription(&alice.pubkey(), None, &sub_id, &sink); subscriptions.add_account_subscription(&alice.pubkey(), None, &sub_id, &sink);
assert!(subscriptions assert!(subscriptions
@ -377,14 +551,12 @@ mod tests {
.unwrap() .unwrap()
.contains_key(&alice.pubkey())); .contains_key(&alice.pubkey()));
subscriptions.check_account(&alice.pubkey(), 0, &bank_forks); subscriptions.notify_subscribers(0, &bank_forks);
let string = transport_receiver.poll(); let response = robust_poll_or_panic(transport_receiver);
if let Async::Ready(Some(response)) = string.unwrap() { let expected = format!(
let expected = format!( r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"subscription":0}}}}"#
r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"subscription":0}}}}"# );
); assert_eq!(expected, response);
assert_eq!(expected, response);
}
subscriptions.remove_account_subscription(&sub_id); subscriptions.remove_account_subscription(&sub_id);
assert!(!subscriptions assert!(!subscriptions
@ -421,11 +593,12 @@ mod tests {
.process_transaction(&tx) .process_transaction(&tx)
.unwrap(); .unwrap();
let (subscriber, _id_receiver, mut transport_receiver) = let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("programNotification"); Subscriber::new_test("programNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default(); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit);
subscriptions.add_program_subscription(&solana_budget_program::id(), None, &sub_id, &sink); subscriptions.add_program_subscription(&solana_budget_program::id(), None, &sub_id, &sink);
assert!(subscriptions assert!(subscriptions
@ -434,15 +607,13 @@ mod tests {
.unwrap() .unwrap()
.contains_key(&solana_budget_program::id())); .contains_key(&solana_budget_program::id()));
subscriptions.check_program(&solana_budget_program::id(), 0, &bank_forks); subscriptions.notify_subscribers(0, &bank_forks);
let string = transport_receiver.poll(); let response = robust_poll_or_panic(transport_receiver);
if let Async::Ready(Some(response)) = string.unwrap() { let expected = format!(
let expected = format!( r#"{{"jsonrpc":"2.0","method":"programNotification","params":{{"result":{{"account":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"pubkey":"{:?}"}},"subscription":0}}}}"#,
r#"{{"jsonrpc":"2.0","method":"programNotification","params":{{"result":{{"account":{{"data":"1111111111111111","executable":false,"lamports":1,"owner":"Budget1111111111111111111111111111111111111","rentEpoch":1}},"pubkey":"{:?}"}},"subscription":0}}}}"#, alice.pubkey()
alice.pubkey() );
); assert_eq!(expected, response);
assert_eq!(expected, response);
}
subscriptions.remove_program_subscription(&sub_id); subscriptions.remove_program_subscription(&sub_id);
assert!(!subscriptions assert!(!subscriptions
@ -472,11 +643,12 @@ mod tests {
.process_transaction(&tx) .process_transaction(&tx)
.unwrap(); .unwrap();
let (subscriber, _id_receiver, mut transport_receiver) = let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("signatureNotification"); Subscriber::new_test("signatureNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default(); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit);
subscriptions.add_signature_subscription(&signature, None, &sub_id, &sink); subscriptions.add_signature_subscription(&signature, None, &sub_id, &sink);
assert!(subscriptions assert!(subscriptions
@ -485,18 +657,16 @@ mod tests {
.unwrap() .unwrap()
.contains_key(&signature)); .contains_key(&signature));
subscriptions.check_signature(&signature, 0, &bank_forks); subscriptions.notify_subscribers(0, &bank_forks);
let string = transport_receiver.poll(); let response = robust_poll_or_panic(transport_receiver);
if let Async::Ready(Some(response)) = string.unwrap() { let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
let expected_res: Option<transaction::Result<()>> = Some(Ok(())); let expected_res_str =
let expected_res_str = serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap();
serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); let expected = format!(
let expected = format!( r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#,
r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str
expected_res_str );
); assert_eq!(expected, response);
assert_eq!(expected, response);
}
subscriptions.remove_signature_subscription(&sub_id); subscriptions.remove_signature_subscription(&sub_id);
assert!(!subscriptions assert!(!subscriptions
@ -507,11 +677,12 @@ mod tests {
} }
#[test] #[test]
fn test_check_slot_subscribe() { fn test_check_slot_subscribe() {
let (subscriber, _id_receiver, mut transport_receiver) = let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("slotNotification"); Subscriber::new_test("slotNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default(); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit);
subscriptions.add_slot_subscription(&sub_id, &sink); subscriptions.add_slot_subscription(&sub_id, &sink);
assert!(subscriptions assert!(subscriptions
@ -521,21 +692,19 @@ mod tests {
.contains_key(&sub_id)); .contains_key(&sub_id));
subscriptions.notify_slot(0, 0, 0); subscriptions.notify_slot(0, 0, 0);
let string = transport_receiver.poll(); let response = robust_poll_or_panic(transport_receiver);
if let Async::Ready(Some(response)) = string.unwrap() { let expected_res = SlotInfo {
let expected_res = SlotInfo { parent: 0,
parent: 0, slot: 0,
slot: 0, root: 0,
root: 0, };
}; let expected_res_str =
let expected_res_str = serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap();
serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); let expected = format!(
let expected = format!( r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#,
r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str
expected_res_str );
); assert_eq!(expected, response);
assert_eq!(expected, response);
}
subscriptions.remove_slot_subscription(&sub_id); subscriptions.remove_slot_subscription(&sub_id);
assert!(!subscriptions assert!(!subscriptions

View File

@ -303,7 +303,7 @@ pub mod tests {
None, None,
None, None,
l_receiver, l_receiver,
&Arc::new(RpcSubscriptions::default()), &Arc::new(RpcSubscriptions::new(&exit)),
&poh_recorder, &poh_recorder,
&leader_schedule_cache, &leader_schedule_cache,
&exit, &exit,

View File

@ -215,7 +215,7 @@ impl Validator {
)) ))
}; };
let subscriptions = Arc::new(RpcSubscriptions::default()); let subscriptions = Arc::new(RpcSubscriptions::new(&exit));
let rpc_pubsub_service = if node.info.rpc_pubsub.port() == 0 { let rpc_pubsub_service = if node.info.rpc_pubsub.port() == 0 {
None None
} else { } else {