diff --git a/doc/json-rpc.md b/doc/json-rpc.md index e5ef61acfd..5091e4607f 100644 --- a/doc/json-rpc.md +++ b/doc/json-rpc.md @@ -5,12 +5,19 @@ Solana nodes accept HTTP requests using the [JSON-RPC 2.0](https://www.jsonrpc.o To interact with a Solana node inside a JavaScript application, use the [solana-web3.js](https://github.com/solana-labs/solana-web3.js) library, which gives a convenient interface for the RPC methods. -RPC Endpoint +RPC HTTP Endpoint --- **Default port:** 8899 eg. http://localhost:8899, http://192.168.1.88:8899 +RPC PubSub WebSocket Endpoint +--- + +**Default port:** 8900 +eg. ws://localhost:8900, http://192.168.1.88:8900 + + Methods --- @@ -234,33 +241,10 @@ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0","id":1, "m --- -### startSubscriptionChannel -Open a socket on the node for JSON-RPC subscription requests - -##### Parameters: -None - -##### Results: -* `string` - "port", open websocket port -* `string` - "path", unique key to use as websocket path - -##### Example: -```bash -// Request -curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc": "2.0","id":1,"method":"startSubscriptionChannel"}' http://localhost:8899 - -// Result -{"jsonrpc":"2.0","result":{"port":9876,"path":"BRbmMXn71cKfzXjFsmrTsWsXuQwbjXbwKdoRwVw1FRA3"},"id":1} -``` - ---- - ### Subscription Websocket -After opening a subscription socket with the `subscriptionChannel` JSON-RPC request method, submit subscription requests via websocket protocol -Connect to the websocket at `ws://
/` returned from the request +After connect to the RPC PubSub websocket at `ws://
/`: - Submit subscription requests to the websocket using the methods below - Multiple subscriptions may be active at once -- The subscription-channel socket will close when client closes websocket. To create new subscriptions, send a new `subscriptionChannel` JSON-RPC request. --- diff --git a/src/fullnode.rs b/src/fullnode.rs index c5cfe2c4d1..e9b2443598 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -9,6 +9,7 @@ use leader_scheduler::LeaderScheduler; use ledger::read_ledger; use ncp::Ncp; use rpc::{JsonRpcService, RPC_PORT}; +use rpc_pubsub::PubSubService; use rpu::Rpu; use service::Service; use signature::{Keypair, KeypairUtil}; @@ -88,6 +89,7 @@ pub struct Fullnode { exit: Arc, rpu: Option, rpc_service: JsonRpcService, + rpc_pubsub_service: PubSubService, ncp: Ncp, bank: Arc, cluster_info: Arc>, @@ -278,6 +280,12 @@ impl Fullnode { // Drone location/id will need to be handled a different way as soon as leader rotation begins let rpc_service = JsonRpcService::new(&bank, &cluster_info, rpc_addr, exit.clone()); + let rpc_pubsub_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::from(0)), + rpc_port.unwrap_or(RPC_PORT) + 1, + ); + let rpc_pubsub_service = PubSubService::new(&bank, rpc_pubsub_addr, exit.clone()); + let ncp = Ncp::new( &cluster_info, shared_window.clone(), @@ -373,6 +381,7 @@ impl Fullnode { rpu, ncp, rpc_service, + rpc_pubsub_service, node_role, ledger_path: ledger_path.to_owned(), exit, @@ -567,6 +576,7 @@ impl Service for Fullnode { } self.ncp.join()?; self.rpc_service.join()?; + self.rpc_pubsub_service.join()?; match self.node_role { Some(NodeRole::Validator(validator_service)) => { diff --git a/src/rpc.rs b/src/rpc.rs index 1ea384c2df..0d7a52f279 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -3,15 +3,12 @@ use bank::{Bank, BankError}; use bincode::deserialize; use bs58; -use cluster_info::{ClusterInfo, FULLNODE_PORT_RANGE}; +use cluster_info::ClusterInfo; use drone::DRONE_PORT; use jsonrpc_core::*; use jsonrpc_http_server::*; -use jsonrpc_macros::pubsub::Sink; -use netutil::find_available_port_in_range; -use rpc_pubsub::{PubSubService, SubscriptionResponse}; use service::Service; -use signature::{Keypair, KeypairUtil, Signature}; +use signature::Signature; use solana_program_interface::account::Account; use solana_program_interface::pubkey::Pubkey; use std::mem; @@ -63,13 +60,10 @@ impl JsonRpcService { warn!("JSON RPC service unavailable: unable to bind to RPC port {}. \nMake sure this port is not already in use by another application", rpc_addr.port()); return; } - loop { - if exit.load(Ordering::Relaxed) { - server.unwrap().close(); - break; - } + while !exit.load(Ordering::Relaxed) { sleep(Duration::from_millis(100)); } + server.unwrap().close(); () }) .unwrap(); @@ -132,9 +126,6 @@ build_rpc_trait! { #[rpc(meta, name = "sendTransaction")] fn send_transaction(&self, Self::Metadata, Vec) -> Result; - - #[rpc(meta, name = "startSubscriptionChannel")] - fn start_subscription_channel(&self, Self::Metadata) -> Result; } } @@ -214,22 +205,6 @@ impl RpcSol for RpcSolImpl { })?; Ok(bs58::encode(tx.signature).into_string()) } - fn start_subscription_channel(&self, meta: Self::Metadata) -> Result { - let port: u16 = find_available_port_in_range(FULLNODE_PORT_RANGE).map_err(|_| Error { - code: ErrorCode::InternalError, - message: "No available port in range".into(), - data: None, - })?; - let mut pubsub_addr = meta.rpc_addr; - pubsub_addr.set_port(port); - let pubkey = Keypair::new().pubkey(); - let _pubsub_service = - PubSubService::new(&meta.request_processor.bank, pubsub_addr, pubkey, meta.exit); - Ok(SubscriptionResponse { - port, - path: pubkey.to_string(), - }) - } } #[derive(Clone)] pub struct JsonRpcRequestProcessor { @@ -264,31 +239,6 @@ impl JsonRpcRequestProcessor { fn get_transaction_count(&self) -> Result { Ok(self.bank.transaction_count() as u64) } - pub fn add_account_subscription( - &self, - bank_sub_id: Pubkey, - pubkey: Pubkey, - sink: Sink, - ) { - self.bank - .add_account_subscription(bank_sub_id, pubkey, sink); - } - pub fn remove_account_subscription(&self, bank_sub_id: &Pubkey, pubkey: &Pubkey) { - self.bank.remove_account_subscription(bank_sub_id, pubkey); - } - pub fn add_signature_subscription( - &self, - bank_sub_id: Pubkey, - signature: Signature, - sink: Sink, - ) { - self.bank - .add_signature_subscription(bank_sub_id, signature, sink); - } - pub fn remove_signature_subscription(&self, bank_sub_id: &Pubkey, signature: &Signature) { - self.bank - .remove_signature_subscription(bank_sub_id, signature); - } } fn get_leader_addr(cluster_info: &Arc>) -> Result { diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index c8a0ad7346..684371bb6d 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -5,10 +5,10 @@ use bs58; use jsonrpc_core::futures::Future; use jsonrpc_core::*; use jsonrpc_macros::pubsub; -use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId}; -use jsonrpc_ws_server::ws; +use jsonrpc_pubsub::{PubSubHandler, Session, SubscriptionId}; use jsonrpc_ws_server::{RequestContext, Sender, ServerBuilder}; use rpc::{JsonRpcRequestProcessor, RpcSignatureStatus}; +use service::Service; use signature::{Keypair, KeypairUtil, Signature}; use solana_program_interface::account::Account; use solana_program_interface::pubkey::Pubkey; @@ -16,8 +16,8 @@ use std::collections::HashMap; use std::mem; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{atomic, Arc, Mutex, RwLock}; -use std::thread::{sleep, Builder, JoinHandle}; +use std::sync::{atomic, Arc, RwLock}; +use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; pub enum ClientState { @@ -25,87 +25,59 @@ pub enum ClientState { Init(Sender), } -#[derive(Serialize)] -pub struct SubscriptionResponse { - pub port: u16, - pub path: String, +pub struct PubSubService { + thread_hdl: JoinHandle<()>, } -pub struct PubSubService { - _thread_hdl: JoinHandle<()>, +impl Service for PubSubService { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } } impl PubSubService { - pub fn new( - bank: &Arc, - pubsub_addr: SocketAddr, - path: Pubkey, - exit: Arc, - ) -> Self { - let request_processor = JsonRpcRequestProcessor::new(bank.clone()); - let status = Arc::new(Mutex::new(ClientState::Uninitialized)); - let client_status = status.clone(); - let server_bank = bank.clone(); - let _thread_hdl = Builder::new() + pub fn new(bank: &Arc, pubsub_addr: SocketAddr, exit: Arc) -> Self { + let rpc = RpcSolPubSubImpl::new(JsonRpcRequestProcessor::new(bank.clone()), bank.clone()); + let thread_hdl = Builder::new() .name("solana-pubsub".to_string()) .spawn(move || { let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::default(); - let account_subs = rpc.account_subscriptions.clone(); - let signature_subs = rpc.signature_subscriptions.clone(); io.extend_with(rpc.to_delegate()); - let server = ServerBuilder::with_meta_extractor(io, move |context: &RequestContext| - { - *client_status.lock().unwrap() = ClientState::Init(context.out.clone()); - Meta { - request_processor: request_processor.clone(), - session: Arc::new(Session::new(context.sender().clone())), - } - }) - .request_middleware(move |req: &ws::Request| - if req.resource() != format!("/{}", path.to_string()) { - Some(ws::Response::new(403, "Client path incorrect or not provided")) - } else { - None - }) - .start(&pubsub_addr); + let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| { + info!("New pubsub connection"); + let session = Arc::new(Session::new(context.sender().clone())); + session.on_drop(Box::new(|| { + info!("Pubsub connection dropped"); + // Following should not be required as jsonrpc_pubsub will + // unsubscribe automatically once the websocket is dropped ... + /* + for (_, (bank_sub_id, pubkey)) in self.account_subscriptions.read().unwrap().iter() { + server_bank.remove_account_subscription(bank_sub_id, pubkey); + } + for (_, (bank_sub_id, signature)) in self.signature_subscriptions.read().unwrap().iter() { + server_bank.remove_signature_subscription(bank_sub_id, signature); + } + */ + })); + session + }) + .start(&pubsub_addr); if server.is_err() { warn!("Pubsub service unavailable: unable to bind to port {}. \nMake sure this port is not already in use by another application", pubsub_addr.port()); return; } while !exit.load(Ordering::Relaxed) { - if let ClientState::Init(ref mut sender) = *status.lock().unwrap() { - if sender.check_active().is_err() { - break; - } - } sleep(Duration::from_millis(100)); } - for (_, (bank_sub_id, pubkey)) in account_subs.read().unwrap().iter() { - server_bank.remove_account_subscription(bank_sub_id, pubkey); - } - for (_, (bank_sub_id, signature)) in signature_subs.read().unwrap().iter() { - server_bank.remove_signature_subscription(bank_sub_id, signature); - } server.unwrap().close(); () }) .unwrap(); - PubSubService { _thread_hdl } - } -} - -#[derive(Clone)] -pub struct Meta { - pub request_processor: JsonRpcRequestProcessor, - pub session: Arc, -} -impl Metadata for Meta {} -impl PubSubMetadata for Meta { - fn session(&self) -> Option> { - Some(self.session.clone()) + PubSubService { thread_hdl } } } @@ -136,21 +108,36 @@ build_rpc_trait! { } } -#[derive(Default)] struct RpcSolPubSubImpl { - uid: atomic::AtomicUsize, + uid: Arc, + request_processor: JsonRpcRequestProcessor, + bank: Arc, account_subscriptions: Arc>>, signature_subscriptions: Arc>>, } + +impl RpcSolPubSubImpl { + fn new(request_processor: JsonRpcRequestProcessor, bank: Arc) -> Self { + RpcSolPubSubImpl { + uid: Default::default(), + request_processor, + bank, + account_subscriptions: Default::default(), + signature_subscriptions: Default::default(), + } + } +} + impl RpcSolPubSub for RpcSolPubSubImpl { - type Metadata = Meta; + type Metadata = Arc; fn account_subscribe( &self, - meta: Self::Metadata, + _meta: Self::Metadata, subscriber: pubsub::Subscriber, pubkey_str: String, ) { + info!("account_subscribe"); let pubkey_vec = bs58::decode(pubkey_str).into_vec().unwrap(); if pubkey_vec.len() != mem::size_of::() { subscriber @@ -172,15 +159,15 @@ impl RpcSolPubSub for RpcSolPubSubImpl { .unwrap() .insert(sub_id.clone(), (bank_sub_id, pubkey)); - meta.request_processor + self.bank .add_account_subscription(bank_sub_id, pubkey, sink); } - fn account_unsubscribe(&self, meta: Self::Metadata, id: SubscriptionId) -> Result { + fn account_unsubscribe(&self, _meta: Self::Metadata, id: SubscriptionId) -> Result { + info!("account_unsubscribe"); if let Some((bank_sub_id, pubkey)) = self.account_subscriptions.write().unwrap().remove(&id) { - meta.request_processor - .remove_account_subscription(&bank_sub_id, &pubkey); + self.bank.remove_account_subscription(&bank_sub_id, &pubkey); Ok(true) } else { Err(Error { @@ -193,10 +180,11 @@ impl RpcSolPubSub for RpcSolPubSubImpl { fn signature_subscribe( &self, - meta: Self::Metadata, + _meta: Self::Metadata, subscriber: pubsub::Subscriber, signature_str: String, ) { + info!("signature_subscribe"); let signature_vec = bs58::decode(signature_str).into_vec().unwrap(); if signature_vec.len() != mem::size_of::() { subscriber @@ -218,7 +206,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { .unwrap() .insert(sub_id.clone(), (bank_sub_id, signature)); - match meta.request_processor.get_signature_status(signature) { + match self.request_processor.get_signature_status(signature) { Ok(_) => { sink.notify(Ok(RpcSignatureStatus::Confirmed)) .wait() @@ -229,17 +217,18 @@ impl RpcSolPubSub for RpcSolPubSubImpl { .remove(&sub_id); } Err(_) => { - meta.request_processor + self.bank .add_signature_subscription(bank_sub_id, signature, sink); } } } - fn signature_unsubscribe(&self, meta: Self::Metadata, id: SubscriptionId) -> Result { + fn signature_unsubscribe(&self, _meta: Self::Metadata, id: SubscriptionId) -> Result { + info!("signature_unsubscribe"); if let Some((bank_sub_id, signature)) = self.signature_subscriptions.write().unwrap().remove(&id) { - meta.request_processor + self.bank .remove_signature_subscription(&bank_sub_id, &signature); Ok(true) } else { @@ -270,10 +259,9 @@ mod tests { let alice = Mint::new(10_000); let bank = Bank::new(&alice); let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); - let pubkey = Keypair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr, pubkey, exit); - let thread = pubsub_service._thread_hdl.thread(); + let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr, exit); + let thread = pubsub_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-pubsub"); } @@ -286,17 +274,15 @@ mod tests { let arc_bank = Arc::new(bank); let last_id = arc_bank.last_id(); - let request_processor = JsonRpcRequestProcessor::new(arc_bank.clone()); let (sender, mut receiver) = mpsc::channel(1); let session = Arc::new(Session::new(sender)); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::default(); + let rpc = RpcSolPubSubImpl::new( + JsonRpcRequestProcessor::new(arc_bank.clone()), + arc_bank.clone(), + ); io.extend_with(rpc.to_delegate()); - let meta = Meta { - request_processor, - session, - }; // Test signature subscription let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0); @@ -305,7 +291,7 @@ mod tests { r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#, tx.signature.to_string() ); - let res = io.handle_request_sync(&req, meta.clone()); + let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","result":0,"id":1}}"#); let expected: Response = serde_json::from_str(&expected).expect("expected response deserialization"); @@ -318,7 +304,7 @@ mod tests { let req = format!( r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["a1b2c3"]}}"# ); - let res = io.handle_request_sync(&req, meta.clone()); + let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Invalid signature provided"}},"id":1}}"#); let expected: Response = serde_json::from_str(&expected).expect("expected response deserialization"); @@ -346,7 +332,7 @@ mod tests { r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#, tx.signature.to_string() ); - let res = io.handle_request_sync(&req, meta.clone()); + let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","result":1,"id":1}}"#); let expected: Response = serde_json::from_str(&expected).expect("expected response deserialization"); @@ -364,28 +350,26 @@ mod tests { let arc_bank = Arc::new(bank); let last_id = arc_bank.last_id(); - let request_processor = JsonRpcRequestProcessor::new(arc_bank); let (sender, _receiver) = mpsc::channel(1); let session = Arc::new(Session::new(sender)); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::default(); + let rpc = RpcSolPubSubImpl::new( + JsonRpcRequestProcessor::new(arc_bank.clone()), + arc_bank.clone(), + ); io.extend_with(rpc.to_delegate()); - let meta = Meta { - request_processor, - session: session.clone(), - }; let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0); let req = format!( r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#, tx.signature.to_string() ); - let _res = io.handle_request_sync(&req, meta.clone()); + let _res = io.handle_request_sync(&req, session.clone()); let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"signatureUnsubscribe","params":[0]}}"#); - let res = io.handle_request_sync(&req, meta.clone()); + let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#); let expected: Response = @@ -398,7 +382,7 @@ mod tests { // Test bad parameter let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"signatureUnsubscribe","params":[1]}}"#); - let res = io.handle_request_sync(&req, meta.clone()); + let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"#); let expected: Response = serde_json::from_str(&expected).expect("expected response deserialization"); @@ -420,24 +404,22 @@ mod tests { let arc_bank = Arc::new(bank); let last_id = arc_bank.last_id(); - let request_processor = JsonRpcRequestProcessor::new(arc_bank.clone()); let (sender, mut receiver) = mpsc::channel(1); let session = Arc::new(Session::new(sender)); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::default(); + let rpc = RpcSolPubSubImpl::new( + JsonRpcRequestProcessor::new(arc_bank.clone()), + arc_bank.clone(), + ); io.extend_with(rpc.to_delegate()); - let meta = Meta { - request_processor, - session, - }; let req = format!( r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["{}"]}}"#, contract_state.pubkey().to_string() ); - let res = io.handle_request_sync(&req, meta.clone()); + let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","result":0,"id":1}}"#); let expected: Response = serde_json::from_str(&expected).expect("expected response deserialization"); @@ -450,7 +432,7 @@ mod tests { let req = format!( r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["a1b2c3"]}}"# ); - let res = io.handle_request_sync(&req, meta.clone()); + let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Invalid pubkey provided"}},"id":1}}"#); let expected: Response = serde_json::from_str(&expected).expect("expected response deserialization"); @@ -481,6 +463,7 @@ mod tests { budget_program_id, 0, ); + arc_bank .process_transaction(&tx) .expect("process transaction"); @@ -488,10 +471,12 @@ mod tests { // Test signature confirmation notification #1 let string = receiver.poll(); assert!(string.is_ok()); + let expected_userdata = arc_bank .get_account(&contract_state.pubkey()) .unwrap() .userdata; + let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", @@ -593,27 +578,26 @@ mod tests { let bank = Bank::new(&alice); let arc_bank = Arc::new(bank); - let request_processor = JsonRpcRequestProcessor::new(arc_bank); let (sender, _receiver) = mpsc::channel(1); let session = Arc::new(Session::new(sender)); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::default(); + let rpc = RpcSolPubSubImpl::new( + JsonRpcRequestProcessor::new(arc_bank.clone()), + arc_bank.clone(), + ); + io.extend_with(rpc.to_delegate()); - let meta = Meta { - request_processor, - session: session.clone(), - }; let req = format!( r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["{}"]}}"#, bob_pubkey.to_string() ); - let _res = io.handle_request_sync(&req, meta.clone()); + let _res = io.handle_request_sync(&req, session.clone()); let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"accountUnsubscribe","params":[0]}}"#); - let res = io.handle_request_sync(&req, meta.clone()); + let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#); let expected: Response = @@ -626,7 +610,7 @@ mod tests { // Test bad parameter let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"accountUnsubscribe","params":[1]}}"#); - let res = io.handle_request_sync(&req, meta.clone()); + let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"#); let expected: Response = serde_json::from_str(&expected).expect("expected response deserialization");