diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index 8d3d7d3d0a..3dfbcd4b2b 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -259,14 +259,9 @@ impl RpcSolPubSubImpl { subscription: Arc::new(Default::default()), } } -} -impl RpcSolPubSub for RpcSolPubSubImpl { - type Metadata = Arc; - - fn account_subscribe( + fn subscribe_to_account_updates( &self, - _meta: Self::Metadata, subscriber: pubsub::Subscriber, pubkey_str: String, ) { @@ -292,22 +287,8 @@ impl RpcSolPubSub for RpcSolPubSubImpl { .add_account_subscription(&pubkey, &sub_id, &sink) } - fn account_unsubscribe(&self, id: SubscriptionId) -> Result { - info!("account_unsubscribe: id={:?}", id); - if self.subscription.remove_account_subscription(&id) { - Ok(true) - } else { - Err(Error { - code: ErrorCode::InvalidParams, - message: "Invalid Request: Subscription id does not exist".into(), - data: None, - }) - } - } - - fn signature_subscribe( + fn subscribe_to_signature_updates( &self, - _meta: Self::Metadata, subscriber: pubsub::Subscriber, signature_str: String, ) { @@ -351,6 +332,41 @@ impl RpcSolPubSub for RpcSolPubSubImpl { .add_signature_subscription(&signature, &sub_id, &sink), } } +} + +impl RpcSolPubSub for RpcSolPubSubImpl { + type Metadata = Arc; + + fn account_subscribe( + &self, + _meta: Self::Metadata, + subscriber: pubsub::Subscriber, + pubkey_str: String, + ) { + self.subscribe_to_account_updates(subscriber, pubkey_str) + } + + fn account_unsubscribe(&self, id: SubscriptionId) -> Result { + info!("account_unsubscribe: id={:?}", id); + if self.subscription.remove_account_subscription(&id) { + Ok(true) + } else { + Err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Subscription id does not exist".into(), + data: None, + }) + } + } + + fn signature_subscribe( + &self, + _meta: Self::Metadata, + subscriber: pubsub::Subscriber, + signature_str: String, + ) { + self.subscribe_to_signature_updates(subscriber, signature_str) + } fn signature_unsubscribe(&self, id: SubscriptionId) -> Result { info!("signature_unsubscribe"); @@ -399,42 +415,17 @@ mod tests { let arc_bank = Arc::new(bank); let last_id = arc_bank.last_id(); - let (sender, mut receiver) = mpsc::channel(1); - let session = Arc::new(Session::new(sender)); - - let mut io = PubSubHandler::default(); let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone()))); let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); - io.extend_with(rpc.to_delegate()); + let subscription = rpc.subscription.clone(); + arc_bank.set_subscriptions(Box::new(subscription)); // Test signature subscription let tx = Transaction::system_move(&alice, bob_pubkey, 20, last_id, 0); - let req = format!( - r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#, - tx.signatures[0].to_string() - ); - let res = io.handle_request_sync(&req, session.clone()); - let expected = format!(r#"{{"jsonrpc":"2.0","result":0,"id":1}}"#); - let expected: Response = - serde_json::from_str(&expected).expect("expected response deserialization"); - - let result: Response = serde_json::from_str(&res.expect("actual response")) - .expect("actual response deserialization"); - assert_eq!(expected, result); - - // Test bad parameter - let req = format!( - r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["a1b2c3"]}}"# - ); - let res = io.handle_request_sync(&req, session.clone()); - let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Invalid signature provided"}},"id":1}}"#); - let expected: Response = - serde_json::from_str(&expected).expect("expected response deserialization"); - - let result: Response = serde_json::from_str(&res.expect("actual response")) - .expect("actual response deserialization"); - assert_eq!(expected, result); + let (subscriber, _id_receiver, mut receiver) = + Subscriber::new_test("signatureNotification"); + rpc.subscribe_to_signature_updates(subscriber, tx.signatures[0].to_string()); arc_bank .process_transaction(&tx) @@ -448,21 +439,6 @@ mod tests { let expected = format!(r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":"Confirmed","subscription":0}}}}"#); assert_eq!(expected, response); } - - // Test subscription id increment - let tx = Transaction::system_move(&alice, bob_pubkey, 10, last_id, 0); - let req = format!( - r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#, - tx.signatures[0].to_string() - ); - let res = io.handle_request_sync(&req, session.clone()); - let expected = format!(r#"{{"jsonrpc":"2.0","result":1,"id":1}}"#); - let expected: Response = - serde_json::from_str(&expected).expect("expected response deserialization"); - - let result: Response = serde_json::from_str(&res.expect("actual response")) - .expect("actual response deserialization"); - assert_eq!(expected, result); } #[test] @@ -527,40 +503,13 @@ mod tests { let arc_bank = Arc::new(bank); let last_id = arc_bank.last_id(); - let (sender, mut receiver) = mpsc::channel(1); - let session = Arc::new(Session::new(sender)); - - let mut io = PubSubHandler::default(); let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone()))); let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); - io.extend_with(rpc.to_delegate()); + let subscription = rpc.subscription.clone(); + arc_bank.set_subscriptions(Box::new(subscription)); - let req = format!( - r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["{}"]}}"#, - contract_state.pubkey().to_string() - ); - - let res = io.handle_request_sync(&req, session.clone()); - let expected = format!(r#"{{"jsonrpc":"2.0","result":0,"id":1}}"#); - let expected: Response = - serde_json::from_str(&expected).expect("expected response deserialization"); - - let result: Response = serde_json::from_str(&res.expect("actual response")) - .expect("actual response deserialization"); - assert_eq!(expected, result); - - // Test bad parameter - let req = format!( - r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["a1b2c3"]}}"# - ); - let res = io.handle_request_sync(&req, session.clone()); - let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Invalid pubkey provided"}},"id":1}}"#); - let expected: Response = - serde_json::from_str(&expected).expect("expected response deserialization"); - - let result: Response = serde_json::from_str(&res.expect("actual response")) - .expect("actual response deserialization"); - assert_eq!(expected, result); + let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); + rpc.subscribe_to_account_updates(subscriber, contract_state.pubkey().to_string()); let tx = Transaction::system_create( &alice,