From 0a0f15baca534957ea4e301b7c9f6ed9c5bf228a Mon Sep 17 00:00:00 2001 From: Sunny Gleason Date: Tue, 26 Nov 2019 03:42:54 -0500 Subject: [PATCH] RPC subscriptions for new slot notifications (#7114) * feat: slot notifications via pubsub rpc w/ tests --- core/src/replay_stage.rs | 22 ++++++++- core/src/rpc_pubsub.rs | 93 ++++++++++++++++++++++++++++++++++- core/src/rpc_subscriptions.rs | 72 +++++++++++++++++++++++++-- 3 files changed, 181 insertions(+), 6 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 5b37277207..34d81a456a 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -233,6 +233,7 @@ impl ReplayStage { &blocktree, &mut bank_forks.write().unwrap(), &leader_schedule_cache, + &subscriptions, ); datapoint_debug!( "replay_stage-memory", @@ -370,6 +371,7 @@ impl ReplayStage { &bank_forks, &poh_recorder, &leader_schedule_cache, + &subscriptions, ); if let Some(bank) = poh_recorder.lock().unwrap().bank() { @@ -442,6 +444,7 @@ impl ReplayStage { bank_forks: &Arc>, poh_recorder: &Arc>, leader_schedule_cache: &Arc, + subscriptions: &Arc, ) { // all the individual calls to poh_recorder.lock() are designed to // increase granularity, decrease contention @@ -496,7 +499,12 @@ impl ReplayStage { ("leader", next_leader.to_string(), String), ); - info!("new fork:{} parent:{} (leader)", poh_slot, parent_slot); + let root_slot = bank_forks.read().unwrap().root(); + info!( + "new fork:{} parent:{} (leader) root:{}", + poh_slot, parent_slot, root_slot + ); + subscriptions.notify_slot(poh_slot, parent_slot, root_slot); let tpu_bank = bank_forks .write() .unwrap() @@ -1085,6 +1093,7 @@ impl ReplayStage { blocktree: &Blocktree, forks: &mut BankForks, leader_schedule_cache: &Arc, + subscriptions: &Arc, ) { // Find the next slot that chains to the old slot let frozen_banks = forks.frozen_banks(); @@ -1111,7 +1120,13 @@ impl ReplayStage { let leader = leader_schedule_cache .slot_leader_at(child_slot, Some(&parent_bank)) .unwrap(); - info!("new fork:{} parent:{}", child_slot, parent_slot); + info!( + "new fork:{} parent:{} root:{}", + child_slot, + parent_slot, + forks.root() + ); + subscriptions.notify_slot(child_slot, parent_slot, forks.root()); forks.insert(Bank::new_from_parent(&parent_bank, &leader, child_slot)); } } @@ -1170,6 +1185,7 @@ pub(crate) mod tests { let genesis_config = create_genesis_config(10_000).genesis_config; let bank0 = Bank::new(&genesis_config); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); + let subscriptions = Arc::new(RpcSubscriptions::default()); let mut bank_forks = BankForks::new(0, bank0); bank_forks.working_bank().freeze(); @@ -1181,6 +1197,7 @@ pub(crate) mod tests { &blocktree, &mut bank_forks, &leader_schedule_cache, + &subscriptions, ); assert!(bank_forks.get(1).is_some()); @@ -1192,6 +1209,7 @@ pub(crate) mod tests { &blocktree, &mut bank_forks, &leader_schedule_cache, + &subscriptions, ); assert!(bank_forks.get(1).is_some()); assert!(bank_forks.get(2).is_some()); diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index a28001f7ae..f3bfdf34cd 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -1,6 +1,6 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::rpc_subscriptions::{Confirmations, RpcSubscriptions}; +use crate::rpc_subscriptions::{Confirmations, RpcSubscriptions, SlotInfo}; use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::typed::Subscriber; @@ -87,6 +87,18 @@ pub trait RpcSolPubSub { name = "signatureUnsubscribe" )] fn signature_unsubscribe(&self, _: Option, _: SubscriptionId) -> Result; + + // Get notification when slot is encountered + #[pubsub(subscription = "slotNotification", subscribe, name = "slotSubscribe")] + fn slot_subscribe(&self, _: Self::Metadata, _: Subscriber); + + // Unsubscribe from slot notification subscription. + #[pubsub( + subscription = "slotNotification", + unsubscribe, + name = "slotUnsubscribe" + )] + fn slot_unsubscribe(&self, _: Option, _: SubscriptionId) -> Result; } #[derive(Default)] @@ -236,6 +248,29 @@ impl RpcSolPubSub for RpcSolPubSubImpl { }) } } + + fn slot_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber) { + info!("slot_subscribe"); + let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); + let sub_id = SubscriptionId::Number(id as u64); + info!("slot_subscribe: id={:?}", sub_id); + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + + self.subscriptions.add_slot_subscription(&sub_id, &sink); + } + + fn slot_unsubscribe(&self, _meta: Option, id: SubscriptionId) -> Result { + info!("slot_unsubscribe"); + if self.subscriptions.remove_slot_subscription(&id) { + Ok(true) + } else { + Err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Subscription id does not exist".into(), + data: None, + }) + } + } } #[cfg(test)] @@ -585,4 +620,60 @@ mod tests { assert_eq!(serde_json::to_string(&expected).unwrap(), response); } } + + #[test] + fn test_slot_subscribe() { + let rpc = RpcSolPubSubImpl::default(); + let session = create_session(); + let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("slotNotification"); + rpc.slot_subscribe(session, subscriber); + + rpc.subscriptions.notify_slot(0, 0, 0); + + // Test slot confirmation notification + let string = receiver.poll(); + if let Async::Ready(Some(response)) = string.unwrap() { + let expected_res = SlotInfo { + parent: 0, + slot: 0, + root: 0, + }; + let expected_res_str = + serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); + let expected = format!(r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str); + assert_eq!(expected, response); + } + } + + #[test] + fn test_slot_unsubscribe() { + let rpc = RpcSolPubSubImpl::default(); + let session = create_session(); + let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("slotNotification"); + rpc.slot_subscribe(session, subscriber); + rpc.subscriptions.notify_slot(0, 0, 0); + + let string = receiver.poll(); + if let Async::Ready(Some(response)) = string.unwrap() { + let expected_res = SlotInfo { + parent: 0, + slot: 0, + root: 0, + }; + let expected_res_str = + serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); + let expected = format!(r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str); + assert_eq!(expected, response); + } + + let session = create_session(); + assert!(rpc + .slot_unsubscribe(Some(session), SubscriptionId::Number(42)) + .is_err()); + + let session = create_session(); + assert!(rpc + .slot_unsubscribe(Some(session), SubscriptionId::Number(0)) + .is_ok()); + } } diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index a4074fc8f3..c9204d8764 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -15,6 +15,13 @@ use std::sync::{Arc, RwLock}; pub type Confirmations = usize; +#[derive(Serialize, Clone)] +pub struct SlotInfo { + pub slot: Slot, + pub parent: Slot, + pub root: Slot, +} + type RpcAccountSubscriptions = RwLock, Confirmations)>>>; type RpcProgramSubscriptions = @@ -22,6 +29,7 @@ type RpcProgramSubscriptions = type RpcSignatureSubscriptions = RwLock< HashMap>, Confirmations)>>, >; +type RpcSlotSubscriptions = RwLock>>; fn add_subscription( subscriptions: &mut HashMap, Confirmations)>>, @@ -119,7 +127,7 @@ fn check_confirmations_and_notify( } } -fn notify_account(result: Option<(S, u64)>, sink: &Sink, root: u64) +fn notify_account(result: Option<(S, Slot)>, sink: &Sink, root: Slot) where S: Clone + Serialize, { @@ -130,7 +138,7 @@ where } } -fn notify_signature(result: Option, sink: &Sink, _root: u64) +fn notify_signature(result: Option, sink: &Sink, _root: Slot) where S: Clone + Serialize, { @@ -139,7 +147,7 @@ where } } -fn notify_program(accounts: Vec<(Pubkey, Account)>, sink: &Sink<(String, Account)>, _root: u64) { +fn notify_program(accounts: Vec<(Pubkey, Account)>, sink: &Sink<(String, Account)>, _root: Slot) { for (pubkey, account) in accounts.iter() { sink.notify(Ok((pubkey.to_string(), account.clone()))) .wait() @@ -151,6 +159,7 @@ pub struct RpcSubscriptions { account_subscriptions: RpcAccountSubscriptions, program_subscriptions: RpcProgramSubscriptions, signature_subscriptions: RpcSignatureSubscriptions, + slot_subscriptions: RpcSlotSubscriptions, } impl Default for RpcSubscriptions { @@ -159,6 +168,7 @@ impl Default for RpcSubscriptions { account_subscriptions: RpcAccountSubscriptions::default(), program_subscriptions: RpcProgramSubscriptions::default(), signature_subscriptions: RpcSignatureSubscriptions::default(), + slot_subscriptions: RpcSlotSubscriptions::default(), } } } @@ -291,6 +301,26 @@ impl RpcSubscriptions { self.check_signature(signature, current_slot, bank_forks); } } + + pub fn add_slot_subscription(&self, sub_id: &SubscriptionId, sink: &Sink) { + let mut subscriptions = self.slot_subscriptions.write().unwrap(); + subscriptions.insert(sub_id.clone(), sink.clone()); + } + + pub fn remove_slot_subscription(&self, id: &SubscriptionId) -> bool { + let mut subscriptions = self.slot_subscriptions.write().unwrap(); + subscriptions.remove(id).is_some() + } + + pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) { + info!("notify_slot!! {} from {} (root={})", slot, parent, root); + let subscriptions = self.slot_subscriptions.read().unwrap(); + for (_, sink) in subscriptions.iter() { + sink.notify(Ok(SlotInfo { slot, parent, root })) + .wait() + .unwrap(); + } + } } #[cfg(test)] @@ -463,4 +493,40 @@ mod tests { .unwrap() .contains_key(&signature)); } + #[test] + fn test_check_slot_subscribe() { + let (subscriber, _id_receiver, mut transport_receiver) = + Subscriber::new_test("slotNotification"); + let sub_id = SubscriptionId::Number(0 as u64); + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let subscriptions = RpcSubscriptions::default(); + subscriptions.add_slot_subscription(&sub_id, &sink); + + assert!(subscriptions + .slot_subscriptions + .read() + .unwrap() + .contains_key(&sub_id)); + + subscriptions.notify_slot(0, 0, 0); + let string = transport_receiver.poll(); + if let Async::Ready(Some(response)) = string.unwrap() { + let expected_res = SlotInfo { + parent: 0, + slot: 0, + root: 0, + }; + let expected_res_str = + serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); + let expected = format!(r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str); + assert_eq!(expected, response); + } + + subscriptions.remove_slot_subscription(&sub_id); + assert!(!subscriptions + .slot_subscriptions + .read() + .unwrap() + .contains_key(&sub_id)); + } }