Add program subscriptions to rpc
This commit is contained in:
parent
0ef1fa7c76
commit
6a81f9e443
|
@ -34,6 +34,23 @@ pub trait RpcSolPubSub {
|
||||||
)]
|
)]
|
||||||
fn account_unsubscribe(&self, _: Option<Self::Metadata>, _: SubscriptionId) -> Result<bool>;
|
fn account_unsubscribe(&self, _: Option<Self::Metadata>, _: SubscriptionId) -> Result<bool>;
|
||||||
|
|
||||||
|
// Get notification every time account userdata owned by a particular program is changed
|
||||||
|
// Accepts pubkey parameter as base-58 encoded string
|
||||||
|
#[pubsub(
|
||||||
|
subscription = "programNotification",
|
||||||
|
subscribe,
|
||||||
|
name = "programSubscribe"
|
||||||
|
)]
|
||||||
|
fn program_subscribe(&self, _: Self::Metadata, _: Subscriber<(String, Account)>, _: String);
|
||||||
|
|
||||||
|
// Unsubscribe from account notification subscription.
|
||||||
|
#[pubsub(
|
||||||
|
subscription = "programNotification",
|
||||||
|
unsubscribe,
|
||||||
|
name = "programUnsubscribe"
|
||||||
|
)]
|
||||||
|
fn program_unsubscribe(&self, _: Option<Self::Metadata>, _: SubscriptionId) -> Result<bool>;
|
||||||
|
|
||||||
// Get notification when signature is verified
|
// Get notification when signature is verified
|
||||||
// Accepts signature parameter as base-58 encoded string
|
// Accepts signature parameter as base-58 encoded string
|
||||||
#[pubsub(
|
#[pubsub(
|
||||||
|
@ -113,6 +130,51 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn program_subscribe(
|
||||||
|
&self,
|
||||||
|
_meta: Self::Metadata,
|
||||||
|
subscriber: Subscriber<(String, Account)>,
|
||||||
|
pubkey_str: String,
|
||||||
|
) {
|
||||||
|
let pubkey_vec = bs58::decode(pubkey_str).into_vec().unwrap();
|
||||||
|
if pubkey_vec.len() != mem::size_of::<Pubkey>() {
|
||||||
|
subscriber
|
||||||
|
.reject(Error {
|
||||||
|
code: ErrorCode::InvalidParams,
|
||||||
|
message: "Invalid Request: Invalid pubkey provided".into(),
|
||||||
|
data: None,
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let pubkey = Pubkey::new(&pubkey_vec);
|
||||||
|
|
||||||
|
let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
|
||||||
|
let sub_id = SubscriptionId::Number(id as u64);
|
||||||
|
info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id);
|
||||||
|
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
||||||
|
|
||||||
|
self.subscriptions
|
||||||
|
.add_program_subscription(&pubkey, &sub_id, &sink)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn program_unsubscribe(
|
||||||
|
&self,
|
||||||
|
_meta: Option<Self::Metadata>,
|
||||||
|
id: SubscriptionId,
|
||||||
|
) -> Result<bool> {
|
||||||
|
info!("account_unsubscribe: id={:?}", id);
|
||||||
|
if self.subscriptions.remove_program_subscription(&id) {
|
||||||
|
Ok(true)
|
||||||
|
} else {
|
||||||
|
Err(Error {
|
||||||
|
code: ErrorCode::InvalidParams,
|
||||||
|
message: "Invalid Request: Subscription id does not exist".into(),
|
||||||
|
data: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn signature_subscribe(
|
fn signature_subscribe(
|
||||||
&self,
|
&self,
|
||||||
_meta: Self::Metadata,
|
_meta: Self::Metadata,
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
//! The `pubsub` module implements a threaded subscription service on client RPC request
|
//! The `pubsub` module implements a threaded subscription service on client RPC request
|
||||||
|
|
||||||
use crate::rpc_status::RpcSignatureStatus;
|
use crate::rpc_status::RpcSignatureStatus;
|
||||||
|
use bs58;
|
||||||
use core::hash::Hash;
|
use core::hash::Hash;
|
||||||
use jsonrpc_core::futures::Future;
|
use jsonrpc_core::futures::Future;
|
||||||
use jsonrpc_pubsub::typed::Sink;
|
use jsonrpc_pubsub::typed::Sink;
|
||||||
|
@ -13,6 +14,8 @@ use std::collections::HashMap;
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
|
|
||||||
type RpcAccountSubscriptions = RwLock<HashMap<Pubkey, HashMap<SubscriptionId, Sink<Account>>>>;
|
type RpcAccountSubscriptions = RwLock<HashMap<Pubkey, HashMap<SubscriptionId, Sink<Account>>>>;
|
||||||
|
type RpcProgramSubscriptions =
|
||||||
|
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, Sink<(String, Account)>>>>;
|
||||||
type RpcSignatureSubscriptions =
|
type RpcSignatureSubscriptions =
|
||||||
RwLock<HashMap<Signature, HashMap<SubscriptionId, Sink<RpcSignatureStatus>>>>;
|
RwLock<HashMap<Signature, HashMap<SubscriptionId, Sink<RpcSignatureStatus>>>>;
|
||||||
|
|
||||||
|
@ -57,6 +60,7 @@ where
|
||||||
|
|
||||||
pub struct RpcSubscriptions {
|
pub struct RpcSubscriptions {
|
||||||
account_subscriptions: RpcAccountSubscriptions,
|
account_subscriptions: RpcAccountSubscriptions,
|
||||||
|
program_subscriptions: RpcProgramSubscriptions,
|
||||||
signature_subscriptions: RpcSignatureSubscriptions,
|
signature_subscriptions: RpcSignatureSubscriptions,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,6 +68,7 @@ impl Default for RpcSubscriptions {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
RpcSubscriptions {
|
RpcSubscriptions {
|
||||||
account_subscriptions: RpcAccountSubscriptions::default(),
|
account_subscriptions: RpcAccountSubscriptions::default(),
|
||||||
|
program_subscriptions: RpcProgramSubscriptions::default(),
|
||||||
signature_subscriptions: RpcSignatureSubscriptions::default(),
|
signature_subscriptions: RpcSignatureSubscriptions::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -79,6 +84,17 @@ impl RpcSubscriptions {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn check_program(&self, program_id: &Pubkey, pubkey: &Pubkey, account: &Account) {
|
||||||
|
let subscriptions = self.program_subscriptions.write().unwrap();
|
||||||
|
if let Some(hashmap) = subscriptions.get(program_id) {
|
||||||
|
for (_bank_sub_id, sink) in hashmap.iter() {
|
||||||
|
sink.notify(Ok((bs58::encode(pubkey).into_string(), account.clone())))
|
||||||
|
.wait()
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn check_signature(&self, signature: &Signature, bank_error: &bank::Result<()>) {
|
pub fn check_signature(&self, signature: &Signature, bank_error: &bank::Result<()>) {
|
||||||
let status = match bank_error {
|
let status = match bank_error {
|
||||||
Ok(_) => RpcSignatureStatus::Confirmed,
|
Ok(_) => RpcSignatureStatus::Confirmed,
|
||||||
|
@ -111,6 +127,21 @@ impl RpcSubscriptions {
|
||||||
remove_subscription(&mut subscriptions, id)
|
remove_subscription(&mut subscriptions, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn add_program_subscription(
|
||||||
|
&self,
|
||||||
|
program_id: &Pubkey,
|
||||||
|
sub_id: &SubscriptionId,
|
||||||
|
sink: &Sink<(String, Account)>,
|
||||||
|
) {
|
||||||
|
let mut subscriptions = self.program_subscriptions.write().unwrap();
|
||||||
|
add_subscription(&mut subscriptions, program_id, sub_id, sink);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool {
|
||||||
|
let mut subscriptions = self.program_subscriptions.write().unwrap();
|
||||||
|
remove_subscription(&mut subscriptions, id)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn add_signature_subscription(
|
pub fn add_signature_subscription(
|
||||||
&self,
|
&self,
|
||||||
signature: &Signature,
|
signature: &Signature,
|
||||||
|
@ -139,6 +170,19 @@ impl RpcSubscriptions {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let programs: Vec<_> = {
|
||||||
|
let subs = self.program_subscriptions.read().unwrap();
|
||||||
|
subs.keys().cloned().collect()
|
||||||
|
};
|
||||||
|
for program_id in &programs {
|
||||||
|
let accounts = &bank.get_program_accounts_modified_since_parent(program_id);
|
||||||
|
if !accounts.is_empty() {
|
||||||
|
for (pubkey, account) in accounts.iter() {
|
||||||
|
self.check_program(program_id, pubkey, account);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let signatures: Vec<_> = {
|
let signatures: Vec<_> = {
|
||||||
let subs = self.signature_subscriptions.read().unwrap();
|
let subs = self.signature_subscriptions.read().unwrap();
|
||||||
subs.keys().cloned().collect()
|
subs.keys().cloned().collect()
|
||||||
|
@ -205,6 +249,52 @@ mod tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.contains_key(&alice.pubkey()));
|
.contains_key(&alice.pubkey()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_check_program_subscribe() {
|
||||||
|
let (genesis_block, mint_keypair) = GenesisBlock::new(100);
|
||||||
|
let bank = Bank::new(&genesis_block);
|
||||||
|
let alice = Keypair::new();
|
||||||
|
let blockhash = bank.last_blockhash();
|
||||||
|
let tx = SystemTransaction::new_program_account(
|
||||||
|
&mint_keypair,
|
||||||
|
alice.pubkey(),
|
||||||
|
blockhash,
|
||||||
|
1,
|
||||||
|
16,
|
||||||
|
solana_budget_api::id(),
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
bank.process_transaction(&tx).unwrap();
|
||||||
|
|
||||||
|
let (subscriber, _id_receiver, mut transport_receiver) =
|
||||||
|
Subscriber::new_test("programNotification");
|
||||||
|
let sub_id = SubscriptionId::Number(0 as u64);
|
||||||
|
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
|
||||||
|
let subscriptions = RpcSubscriptions::default();
|
||||||
|
subscriptions.add_program_subscription(&solana_budget_api::id(), &sub_id, &sink);
|
||||||
|
|
||||||
|
assert!(subscriptions
|
||||||
|
.program_subscriptions
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.contains_key(&solana_budget_api::id()));
|
||||||
|
|
||||||
|
let account = bank.get_account(&alice.pubkey()).unwrap();
|
||||||
|
subscriptions.check_program(&solana_budget_api::id(), &alice.pubkey(), &account);
|
||||||
|
let string = transport_receiver.poll();
|
||||||
|
if let Async::Ready(Some(response)) = string.unwrap() {
|
||||||
|
let expected = format!(r#"{{"jsonrpc":"2.0","method":"programNotification","params":{{"result":["{:?}",{{"executable":false,"lamports":1,"owner":[129,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"userdata":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}}],"subscription":0}}}}"#, alice.pubkey());
|
||||||
|
assert_eq!(expected, response);
|
||||||
|
}
|
||||||
|
|
||||||
|
subscriptions.remove_program_subscription(&sub_id);
|
||||||
|
assert!(!subscriptions
|
||||||
|
.program_subscriptions
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.contains_key(&solana_budget_api::id()));
|
||||||
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn test_check_signature_subscribe() {
|
fn test_check_signature_subscribe() {
|
||||||
let (genesis_block, mint_keypair) = GenesisBlock::new(100);
|
let (genesis_block, mint_keypair) = GenesisBlock::new(100);
|
||||||
|
|
Loading…
Reference in New Issue