RPC subscriptions for new slot notifications (#7114)

* feat: slot notifications via pubsub rpc w/ tests
This commit is contained in:
Sunny Gleason 2019-11-26 03:42:54 -05:00 committed by GitHub
parent 58c144ee55
commit 0a0f15baca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 181 additions and 6 deletions

View File

@ -233,6 +233,7 @@ impl ReplayStage {
&blocktree,
&mut bank_forks.write().unwrap(),
&leader_schedule_cache,
&subscriptions,
);
datapoint_debug!(
"replay_stage-memory",
@ -370,6 +371,7 @@ impl ReplayStage {
&bank_forks,
&poh_recorder,
&leader_schedule_cache,
&subscriptions,
);
if let Some(bank) = poh_recorder.lock().unwrap().bank() {
@ -442,6 +444,7 @@ impl ReplayStage {
bank_forks: &Arc<RwLock<BankForks>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
subscriptions: &Arc<RpcSubscriptions>,
) {
// all the individual calls to poh_recorder.lock() are designed to
// increase granularity, decrease contention
@ -496,7 +499,12 @@ impl ReplayStage {
("leader", next_leader.to_string(), String),
);
info!("new fork:{} parent:{} (leader)", poh_slot, parent_slot);
let root_slot = bank_forks.read().unwrap().root();
info!(
"new fork:{} parent:{} (leader) root:{}",
poh_slot, parent_slot, root_slot
);
subscriptions.notify_slot(poh_slot, parent_slot, root_slot);
let tpu_bank = bank_forks
.write()
.unwrap()
@ -1085,6 +1093,7 @@ impl ReplayStage {
blocktree: &Blocktree,
forks: &mut BankForks,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
subscriptions: &Arc<RpcSubscriptions>,
) {
// Find the next slot that chains to the old slot
let frozen_banks = forks.frozen_banks();
@ -1111,7 +1120,13 @@ impl ReplayStage {
let leader = leader_schedule_cache
.slot_leader_at(child_slot, Some(&parent_bank))
.unwrap();
info!("new fork:{} parent:{}", child_slot, parent_slot);
info!(
"new fork:{} parent:{} root:{}",
child_slot,
parent_slot,
forks.root()
);
subscriptions.notify_slot(child_slot, parent_slot, forks.root());
forks.insert(Bank::new_from_parent(&parent_bank, &leader, child_slot));
}
}
@ -1170,6 +1185,7 @@ pub(crate) mod tests {
let genesis_config = create_genesis_config(10_000).genesis_config;
let bank0 = Bank::new(&genesis_config);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0));
let subscriptions = Arc::new(RpcSubscriptions::default());
let mut bank_forks = BankForks::new(0, bank0);
bank_forks.working_bank().freeze();
@ -1181,6 +1197,7 @@ pub(crate) mod tests {
&blocktree,
&mut bank_forks,
&leader_schedule_cache,
&subscriptions,
);
assert!(bank_forks.get(1).is_some());
@ -1192,6 +1209,7 @@ pub(crate) mod tests {
&blocktree,
&mut bank_forks,
&leader_schedule_cache,
&subscriptions,
);
assert!(bank_forks.get(1).is_some());
assert!(bank_forks.get(2).is_some());

View File

@ -1,6 +1,6 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::rpc_subscriptions::{Confirmations, RpcSubscriptions};
use crate::rpc_subscriptions::{Confirmations, RpcSubscriptions, SlotInfo};
use jsonrpc_core::{Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::typed::Subscriber;
@ -87,6 +87,18 @@ pub trait RpcSolPubSub {
name = "signatureUnsubscribe"
)]
fn signature_unsubscribe(&self, _: Option<Self::Metadata>, _: SubscriptionId) -> Result<bool>;
// Get notification when slot is encountered
#[pubsub(subscription = "slotNotification", subscribe, name = "slotSubscribe")]
fn slot_subscribe(&self, _: Self::Metadata, _: Subscriber<SlotInfo>);
// Unsubscribe from slot notification subscription.
#[pubsub(
subscription = "slotNotification",
unsubscribe,
name = "slotUnsubscribe"
)]
fn slot_unsubscribe(&self, _: Option<Self::Metadata>, _: SubscriptionId) -> Result<bool>;
}
#[derive(Default)]
@ -236,6 +248,29 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
})
}
}
fn slot_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<SlotInfo>) {
info!("slot_subscribe");
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
let sub_id = SubscriptionId::Number(id as u64);
info!("slot_subscribe: id={:?}", sub_id);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
self.subscriptions.add_slot_subscription(&sub_id, &sink);
}
fn slot_unsubscribe(&self, _meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {
info!("slot_unsubscribe");
if self.subscriptions.remove_slot_subscription(&id) {
Ok(true)
} else {
Err(Error {
code: ErrorCode::InvalidParams,
message: "Invalid Request: Subscription id does not exist".into(),
data: None,
})
}
}
}
#[cfg(test)]
@ -585,4 +620,60 @@ mod tests {
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
}
}
#[test]
fn test_slot_subscribe() {
let rpc = RpcSolPubSubImpl::default();
let session = create_session();
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("slotNotification");
rpc.slot_subscribe(session, subscriber);
rpc.subscriptions.notify_slot(0, 0, 0);
// Test slot confirmation notification
let string = receiver.poll();
if let Async::Ready(Some(response)) = string.unwrap() {
let expected_res = SlotInfo {
parent: 0,
slot: 0,
root: 0,
};
let expected_res_str =
serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap();
let expected = format!(r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str);
assert_eq!(expected, response);
}
}
#[test]
fn test_slot_unsubscribe() {
let rpc = RpcSolPubSubImpl::default();
let session = create_session();
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("slotNotification");
rpc.slot_subscribe(session, subscriber);
rpc.subscriptions.notify_slot(0, 0, 0);
let string = receiver.poll();
if let Async::Ready(Some(response)) = string.unwrap() {
let expected_res = SlotInfo {
parent: 0,
slot: 0,
root: 0,
};
let expected_res_str =
serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap();
let expected = format!(r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str);
assert_eq!(expected, response);
}
let session = create_session();
assert!(rpc
.slot_unsubscribe(Some(session), SubscriptionId::Number(42))
.is_err());
let session = create_session();
assert!(rpc
.slot_unsubscribe(Some(session), SubscriptionId::Number(0))
.is_ok());
}
}

View File

@ -15,6 +15,13 @@ use std::sync::{Arc, RwLock};
pub type Confirmations = usize;
#[derive(Serialize, Clone)]
pub struct SlotInfo {
pub slot: Slot,
pub parent: Slot,
pub root: Slot,
}
type RpcAccountSubscriptions =
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<Account>, Confirmations)>>>;
type RpcProgramSubscriptions =
@ -22,6 +29,7 @@ type RpcProgramSubscriptions =
type RpcSignatureSubscriptions = RwLock<
HashMap<Signature, HashMap<SubscriptionId, (Sink<transaction::Result<()>>, Confirmations)>>,
>;
type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
fn add_subscription<K, S>(
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, (Sink<S>, Confirmations)>>,
@ -119,7 +127,7 @@ fn check_confirmations_and_notify<K, S, F, N, X>(
}
}
fn notify_account<S>(result: Option<(S, u64)>, sink: &Sink<S>, root: u64)
fn notify_account<S>(result: Option<(S, Slot)>, sink: &Sink<S>, root: Slot)
where
S: Clone + Serialize,
{
@ -130,7 +138,7 @@ where
}
}
fn notify_signature<S>(result: Option<S>, sink: &Sink<S>, _root: u64)
fn notify_signature<S>(result: Option<S>, sink: &Sink<S>, _root: Slot)
where
S: Clone + Serialize,
{
@ -139,7 +147,7 @@ where
}
}
fn notify_program(accounts: Vec<(Pubkey, Account)>, sink: &Sink<(String, Account)>, _root: u64) {
fn notify_program(accounts: Vec<(Pubkey, Account)>, sink: &Sink<(String, Account)>, _root: Slot) {
for (pubkey, account) in accounts.iter() {
sink.notify(Ok((pubkey.to_string(), account.clone())))
.wait()
@ -151,6 +159,7 @@ pub struct RpcSubscriptions {
account_subscriptions: RpcAccountSubscriptions,
program_subscriptions: RpcProgramSubscriptions,
signature_subscriptions: RpcSignatureSubscriptions,
slot_subscriptions: RpcSlotSubscriptions,
}
impl Default for RpcSubscriptions {
@ -159,6 +168,7 @@ impl Default for RpcSubscriptions {
account_subscriptions: RpcAccountSubscriptions::default(),
program_subscriptions: RpcProgramSubscriptions::default(),
signature_subscriptions: RpcSignatureSubscriptions::default(),
slot_subscriptions: RpcSlotSubscriptions::default(),
}
}
}
@ -291,6 +301,26 @@ impl RpcSubscriptions {
self.check_signature(signature, current_slot, bank_forks);
}
}
pub fn add_slot_subscription(&self, sub_id: &SubscriptionId, sink: &Sink<SlotInfo>) {
let mut subscriptions = self.slot_subscriptions.write().unwrap();
subscriptions.insert(sub_id.clone(), sink.clone());
}
pub fn remove_slot_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.slot_subscriptions.write().unwrap();
subscriptions.remove(id).is_some()
}
pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) {
info!("notify_slot!! {} from {} (root={})", slot, parent, root);
let subscriptions = self.slot_subscriptions.read().unwrap();
for (_, sink) in subscriptions.iter() {
sink.notify(Ok(SlotInfo { slot, parent, root }))
.wait()
.unwrap();
}
}
}
#[cfg(test)]
@ -463,4 +493,40 @@ mod tests {
.unwrap()
.contains_key(&signature));
}
#[test]
fn test_check_slot_subscribe() {
let (subscriber, _id_receiver, mut transport_receiver) =
Subscriber::new_test("slotNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default();
subscriptions.add_slot_subscription(&sub_id, &sink);
assert!(subscriptions
.slot_subscriptions
.read()
.unwrap()
.contains_key(&sub_id));
subscriptions.notify_slot(0, 0, 0);
let string = transport_receiver.poll();
if let Async::Ready(Some(response)) = string.unwrap() {
let expected_res = SlotInfo {
parent: 0,
slot: 0,
root: 0,
};
let expected_res_str =
serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap();
let expected = format!(r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str);
assert_eq!(expected, response);
}
subscriptions.remove_slot_subscription(&sub_id);
assert!(!subscriptions
.slot_subscriptions
.read()
.unwrap()
.contains_key(&sub_id));
}
}