parent
c242d66130
commit
c1a3b6ecc2
|
@ -334,6 +334,7 @@ impl ReplayStage {
|
|||
&latest_root_senders,
|
||||
&mut earliest_vote_on_fork,
|
||||
&mut all_pubkeys,
|
||||
&subscriptions,
|
||||
)?;
|
||||
|
||||
ancestors
|
||||
|
@ -712,6 +713,7 @@ impl ReplayStage {
|
|||
latest_root_senders: &[Sender<Slot>],
|
||||
earliest_vote_on_fork: &mut Slot,
|
||||
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
||||
subscriptions: &Arc<RpcSubscriptions>,
|
||||
) -> Result<()> {
|
||||
if bank.is_empty() {
|
||||
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
|
||||
|
@ -745,6 +747,7 @@ impl ReplayStage {
|
|||
earliest_vote_on_fork,
|
||||
all_pubkeys,
|
||||
);
|
||||
subscriptions.notify_roots(rooted_slots);
|
||||
latest_root_senders.iter().for_each(|s| {
|
||||
if let Err(e) = s.send(new_root) {
|
||||
trace!("latest root send failed: {:?}", e);
|
||||
|
|
|
@ -5,7 +5,7 @@ use jsonrpc_core::{Error, ErrorCode, Result};
|
|||
use jsonrpc_derive::rpc;
|
||||
use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId};
|
||||
use solana_client::rpc_response::{Response as RpcResponse, RpcAccount, RpcKeyedAccount};
|
||||
use solana_sdk::{pubkey::Pubkey, signature::Signature, transaction};
|
||||
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature, transaction};
|
||||
use std::sync::{atomic, Arc};
|
||||
|
||||
// Suppress needless_return due to
|
||||
|
@ -102,6 +102,18 @@ pub trait RpcSolPubSub {
|
|||
name = "slotUnsubscribe"
|
||||
)]
|
||||
fn slot_unsubscribe(&self, meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool>;
|
||||
|
||||
// Get notification when a new root is set
|
||||
#[pubsub(subscription = "rootNotification", subscribe, name = "rootSubscribe")]
|
||||
fn root_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<Slot>);
|
||||
|
||||
// Unsubscribe from slot notification subscription.
|
||||
#[pubsub(
|
||||
subscription = "rootNotification",
|
||||
unsubscribe,
|
||||
name = "rootUnsubscribe"
|
||||
)]
|
||||
fn root_unsubscribe(&self, meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool>;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
@ -274,6 +286,27 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn root_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<Slot>) {
|
||||
info!("root_subscribe");
|
||||
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
|
||||
let sub_id = SubscriptionId::Number(id as u64);
|
||||
info!("root_subscribe: id={:?}", sub_id);
|
||||
self.subscriptions.add_root_subscription(sub_id, subscriber);
|
||||
}
|
||||
|
||||
fn root_unsubscribe(&self, _meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {
|
||||
info!("root_unsubscribe");
|
||||
if self.subscriptions.remove_root_subscription(&id) {
|
||||
Ok(true)
|
||||
} else {
|
||||
Err(Error {
|
||||
code: ErrorCode::InvalidParams,
|
||||
message: "Invalid Request: Subscription id does not exist".into(),
|
||||
data: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -338,7 +371,7 @@ mod tests {
|
|||
process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap();
|
||||
|
||||
// Test signature confirmation notification
|
||||
let response = robust_poll_or_panic(receiver);
|
||||
let (response, _) = robust_poll_or_panic(receiver);
|
||||
let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
|
||||
let expected = json!({
|
||||
"jsonrpc": "2.0",
|
||||
|
@ -480,7 +513,7 @@ mod tests {
|
|||
}
|
||||
});
|
||||
|
||||
let response = robust_poll_or_panic(receiver);
|
||||
let (response, _) = robust_poll_or_panic(receiver);
|
||||
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
|
||||
|
||||
let tx = system_transaction::transfer(&alice, &witness.pubkey(), 1, blockhash);
|
||||
|
@ -629,7 +662,7 @@ mod tests {
|
|||
"subscription": 0,
|
||||
}
|
||||
});
|
||||
let response = robust_poll_or_panic(receiver);
|
||||
let (response, _) = robust_poll_or_panic(receiver);
|
||||
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
|
||||
}
|
||||
|
||||
|
@ -642,7 +675,7 @@ mod tests {
|
|||
|
||||
rpc.subscriptions.notify_slot(0, 0, 0);
|
||||
// Test slot confirmation notification
|
||||
let response = robust_poll_or_panic(receiver);
|
||||
let (response, _) = robust_poll_or_panic(receiver);
|
||||
let expected_res = SlotInfo {
|
||||
parent: 0,
|
||||
slot: 0,
|
||||
|
@ -664,7 +697,7 @@ mod tests {
|
|||
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification");
|
||||
rpc.slot_subscribe(session, subscriber);
|
||||
rpc.subscriptions.notify_slot(0, 0, 0);
|
||||
let response = robust_poll_or_panic(receiver);
|
||||
let (response, _) = robust_poll_or_panic(receiver);
|
||||
let expected_res = SlotInfo {
|
||||
parent: 0,
|
||||
slot: 0,
|
||||
|
|
|
@ -38,12 +38,14 @@ pub struct SlotInfo {
|
|||
|
||||
enum NotificationEntry {
|
||||
Slot(SlotInfo),
|
||||
Root(Slot),
|
||||
Bank((Slot, Arc<RwLock<BankForks>>)),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for NotificationEntry {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
NotificationEntry::Root(root) => write!(f, "Root({})", root),
|
||||
NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info),
|
||||
NotificationEntry::Bank((current_slot, _)) => {
|
||||
write!(f, "Bank({{current_slot: {:?}}})", current_slot)
|
||||
|
@ -64,6 +66,7 @@ type RpcSignatureSubscriptions = RwLock<
|
|||
>,
|
||||
>;
|
||||
type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
|
||||
type RpcRootSubscriptions = RwLock<HashMap<SubscriptionId, Sink<Slot>>>;
|
||||
|
||||
fn add_subscription<K, S>(
|
||||
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, (Sink<S>, Confirmations)>>,
|
||||
|
@ -224,6 +227,7 @@ pub struct RpcSubscriptions {
|
|||
program_subscriptions: Arc<RpcProgramSubscriptions>,
|
||||
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
|
||||
slot_subscriptions: Arc<RpcSlotSubscriptions>,
|
||||
root_subscriptions: Arc<RpcRootSubscriptions>,
|
||||
notification_sender: Arc<Mutex<Sender<NotificationEntry>>>,
|
||||
t_cleanup: Option<JoinHandle<()>>,
|
||||
notifier_runtime: Option<Runtime>,
|
||||
|
@ -255,6 +259,7 @@ impl RpcSubscriptions {
|
|||
let program_subscriptions = Arc::new(RpcProgramSubscriptions::default());
|
||||
let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default());
|
||||
let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default());
|
||||
let root_subscriptions = Arc::new(RpcRootSubscriptions::default());
|
||||
let notification_sender = Arc::new(Mutex::new(notification_sender));
|
||||
|
||||
let exit_clone = exit.clone();
|
||||
|
@ -262,6 +267,7 @@ impl RpcSubscriptions {
|
|||
let program_subscriptions_clone = program_subscriptions.clone();
|
||||
let signature_subscriptions_clone = signature_subscriptions.clone();
|
||||
let slot_subscriptions_clone = slot_subscriptions.clone();
|
||||
let root_subscriptions_clone = root_subscriptions.clone();
|
||||
|
||||
let notifier_runtime = RuntimeBuilder::new()
|
||||
.core_threads(1)
|
||||
|
@ -281,6 +287,7 @@ impl RpcSubscriptions {
|
|||
program_subscriptions_clone,
|
||||
signature_subscriptions_clone,
|
||||
slot_subscriptions_clone,
|
||||
root_subscriptions_clone,
|
||||
);
|
||||
})
|
||||
.unwrap();
|
||||
|
@ -290,6 +297,7 @@ impl RpcSubscriptions {
|
|||
program_subscriptions,
|
||||
signature_subscriptions,
|
||||
slot_subscriptions,
|
||||
root_subscriptions,
|
||||
notification_sender,
|
||||
notifier_runtime: Some(notifier_runtime),
|
||||
t_cleanup: Some(t_cleanup),
|
||||
|
@ -447,6 +455,24 @@ impl RpcSubscriptions {
|
|||
self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root }));
|
||||
}
|
||||
|
||||
pub fn add_root_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<Slot>) {
|
||||
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
||||
let mut subscriptions = self.root_subscriptions.write().unwrap();
|
||||
subscriptions.insert(sub_id, sink);
|
||||
}
|
||||
|
||||
pub fn remove_root_subscription(&self, id: &SubscriptionId) -> bool {
|
||||
let mut subscriptions = self.root_subscriptions.write().unwrap();
|
||||
subscriptions.remove(id).is_some()
|
||||
}
|
||||
|
||||
pub fn notify_roots(&self, mut rooted_slots: Vec<Slot>) {
|
||||
rooted_slots.sort();
|
||||
rooted_slots.into_iter().for_each(|root| {
|
||||
self.enqueue_notification(NotificationEntry::Root(root));
|
||||
});
|
||||
}
|
||||
|
||||
fn enqueue_notification(&self, notification_entry: NotificationEntry) {
|
||||
match self
|
||||
.notification_sender
|
||||
|
@ -472,6 +498,7 @@ impl RpcSubscriptions {
|
|||
program_subscriptions: Arc<RpcProgramSubscriptions>,
|
||||
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
|
||||
slot_subscriptions: Arc<RpcSlotSubscriptions>,
|
||||
root_subscriptions: Arc<RpcRootSubscriptions>,
|
||||
) {
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
|
@ -485,6 +512,12 @@ impl RpcSubscriptions {
|
|||
notifier.notify(slot_info, sink);
|
||||
}
|
||||
}
|
||||
NotificationEntry::Root(root) => {
|
||||
let subscriptions = root_subscriptions.read().unwrap();
|
||||
for (_, sink) in subscriptions.iter() {
|
||||
notifier.notify(root, sink);
|
||||
}
|
||||
}
|
||||
NotificationEntry::Bank((current_slot, bank_forks)) => {
|
||||
let pubkeys: Vec<_> = {
|
||||
let subs = account_subscriptions.read().unwrap();
|
||||
|
@ -576,7 +609,7 @@ pub(crate) mod tests {
|
|||
|
||||
pub(crate) fn robust_poll_or_panic<T: Debug + Send + 'static>(
|
||||
receiver: futures::sync::mpsc::Receiver<T>,
|
||||
) -> T {
|
||||
) -> (T, futures::sync::mpsc::Receiver<T>) {
|
||||
let (inner_sender, inner_receiver) = channel();
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
rt.spawn(futures::lazy(|| {
|
||||
|
@ -584,7 +617,9 @@ pub(crate) mod tests {
|
|||
.into_future()
|
||||
.timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS))
|
||||
.map(move |result| match result {
|
||||
(Some(value), _) => inner_sender.send(value).expect("send error"),
|
||||
(Some(value), receiver) => {
|
||||
inner_sender.send((value, receiver)).expect("send error")
|
||||
}
|
||||
(None, _) => panic!("unexpected end of stream"),
|
||||
})
|
||||
.map_err(|err| panic!("stream error {:?}", err));
|
||||
|
@ -638,7 +673,7 @@ pub(crate) mod tests {
|
|||
.contains_key(&alice.pubkey()));
|
||||
|
||||
subscriptions.notify_subscribers(0, &bank_forks);
|
||||
let response = robust_poll_or_panic(transport_receiver);
|
||||
let (response, _) = robust_poll_or_panic(transport_receiver);
|
||||
let expected = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "accountNotification",
|
||||
|
@ -712,7 +747,7 @@ pub(crate) mod tests {
|
|||
.contains_key(&solana_budget_program::id()));
|
||||
|
||||
subscriptions.notify_subscribers(0, &bank_forks);
|
||||
let response = robust_poll_or_panic(transport_receiver);
|
||||
let (response, _) = robust_poll_or_panic(transport_receiver);
|
||||
let expected = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "programNotification",
|
||||
|
@ -833,7 +868,7 @@ pub(crate) mod tests {
|
|||
"subscription": 2,
|
||||
}
|
||||
});
|
||||
let response = robust_poll_or_panic(past_bank_recv);
|
||||
let (response, _) = robust_poll_or_panic(past_bank_recv);
|
||||
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
|
||||
|
||||
let expected = json!({
|
||||
|
@ -847,7 +882,7 @@ pub(crate) mod tests {
|
|||
"subscription": 3,
|
||||
}
|
||||
});
|
||||
let response = robust_poll_or_panic(processed_recv);
|
||||
let (response, _) = robust_poll_or_panic(processed_recv);
|
||||
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
|
||||
|
||||
let sig_subs = subscriptions.signature_subscriptions.read().unwrap();
|
||||
|
@ -881,7 +916,7 @@ pub(crate) mod tests {
|
|||
.contains_key(&sub_id));
|
||||
|
||||
subscriptions.notify_slot(0, 0, 0);
|
||||
let response = robust_poll_or_panic(transport_receiver);
|
||||
let (response, _) = robust_poll_or_panic(transport_receiver);
|
||||
let expected_res = SlotInfo {
|
||||
parent: 0,
|
||||
slot: 0,
|
||||
|
@ -903,6 +938,43 @@ pub(crate) mod tests {
|
|||
.contains_key(&sub_id));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_root_subscribe() {
|
||||
let (subscriber, _id_receiver, mut transport_receiver) =
|
||||
Subscriber::new_test("rootNotification");
|
||||
let sub_id = SubscriptionId::Number(0 as u64);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let subscriptions = RpcSubscriptions::new(&exit);
|
||||
subscriptions.add_root_subscription(sub_id.clone(), subscriber);
|
||||
|
||||
assert!(subscriptions
|
||||
.root_subscriptions
|
||||
.read()
|
||||
.unwrap()
|
||||
.contains_key(&sub_id));
|
||||
|
||||
subscriptions.notify_roots(vec![2, 1, 3]);
|
||||
|
||||
for expected_root in 1..=3 {
|
||||
let (response, receiver) = robust_poll_or_panic(transport_receiver);
|
||||
transport_receiver = receiver;
|
||||
let expected_res_str =
|
||||
serde_json::to_string(&serde_json::to_value(expected_root).unwrap()).unwrap();
|
||||
let expected = format!(
|
||||
r#"{{"jsonrpc":"2.0","method":"rootNotification","params":{{"result":{},"subscription":0}}}}"#,
|
||||
expected_res_str
|
||||
);
|
||||
assert_eq!(expected, response);
|
||||
}
|
||||
|
||||
subscriptions.remove_root_subscription(&sub_id);
|
||||
assert!(!subscriptions
|
||||
.root_subscriptions
|
||||
.read()
|
||||
.unwrap()
|
||||
.contains_key(&sub_id));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_and_remove_subscription() {
|
||||
let mut subscriptions: HashMap<u64, HashMap<SubscriptionId, (Sink<()>, Confirmations)>> =
|
||||
|
|
|
@ -1255,7 +1255,7 @@ None
|
|||
|
||||
### slotUnsubscribe
|
||||
|
||||
Unsubscribe from signature confirmation notification
|
||||
Unsubscribe from slot notifications
|
||||
|
||||
#### Parameters:
|
||||
|
||||
|
@ -1274,3 +1274,55 @@ Unsubscribe from signature confirmation notification
|
|||
// Result
|
||||
{"jsonrpc": "2.0","result": true,"id": 1}
|
||||
```
|
||||
|
||||
### rootSubscribe
|
||||
|
||||
Subscribe to receive notification anytime a new root is set by the validator.
|
||||
|
||||
#### Parameters:
|
||||
|
||||
None
|
||||
|
||||
#### Results:
|
||||
|
||||
* `integer` - subscription id \(needed to unsubscribe\)
|
||||
|
||||
#### Example:
|
||||
|
||||
```bash
|
||||
// Request
|
||||
{"jsonrpc":"2.0", "id":1, "method":"rootSubscribe"}
|
||||
|
||||
// Result
|
||||
{"jsonrpc": "2.0","result": 0,"id": 1}
|
||||
```
|
||||
|
||||
#### Notification Format:
|
||||
|
||||
The result is the latest root slot number.
|
||||
|
||||
```bash
|
||||
{"jsonrpc": "2.0","method": "rootNotification", "params": {"result":42,"subscription":0}}
|
||||
```
|
||||
|
||||
### rootUnsubscribe
|
||||
|
||||
Unsubscribe from root notifications
|
||||
|
||||
#### Parameters:
|
||||
|
||||
* `<integer>` - subscription id to cancel
|
||||
|
||||
#### Results:
|
||||
|
||||
* `<bool>` - unsubscribe success message
|
||||
|
||||
#### Example:
|
||||
|
||||
```bash
|
||||
// Request
|
||||
{"jsonrpc":"2.0", "id":1, "method":"rootUnsubscribe", "params":[0]}
|
||||
|
||||
// Result
|
||||
{"jsonrpc": "2.0","result": true,"id": 1}
|
||||
```
|
||||
|
|
Loading…
Reference in New Issue