From c6662a45125ee741ea0a9ece18588a2091fd0f75 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 14 Aug 2018 18:03:48 -0600 Subject: [PATCH] Implement Rpc in Fullnode --- Cargo.toml | 4 - src/bin/json-rpc.rs | 83 ----------- src/fullnode.rs | 13 +- src/rpc.rs | 352 +++++++++++++++++++++++++++----------------- 4 files changed, 231 insertions(+), 221 deletions(-) delete mode 100644 src/bin/json-rpc.rs diff --git a/Cargo.toml b/Cargo.toml index 222212f82..299b11e74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,10 +53,6 @@ path = "src/bin/keygen.rs" name = "solana-wallet" path = "src/bin/wallet.rs" -[[bin]] -name = "solana-json-rpc" -path = "src/bin/json-rpc.rs" - [badges] codecov = { repository = "solana-labs/solana", branch = "master", service = "github" } diff --git a/src/bin/json-rpc.rs b/src/bin/json-rpc.rs deleted file mode 100644 index b62dca8c8..000000000 --- a/src/bin/json-rpc.rs +++ /dev/null @@ -1,83 +0,0 @@ -//! The `json-rpc` service launches an HTTP server to listen for -//! Solana rpc requests. - -#[macro_use] -extern crate clap; -extern crate dirs; -extern crate jsonrpc_core; -extern crate jsonrpc_http_server; -extern crate solana; - -use clap::{App, Arg}; -use jsonrpc_core::*; -use jsonrpc_http_server::*; -use solana::crdt::NodeInfo; -use solana::fullnode::Config; -use solana::rpc::*; -use std::error; -use std::fs::File; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::result::Result; - -fn main() -> Result<(), Box> { - let matches = App::new("json-rpc") - .version(crate_version!()) - .arg( - Arg::with_name("leader") - .short("l") - .long("leader") - .value_name("PATH") - .takes_value(true) - .help("/path/to/leader.json"), - ) - .arg( - Arg::with_name("keypair") - .short("k") - .long("keypair") - .value_name("PATH") - .takes_value(true) - .required(true) - .help("/path/to/id.json"), - ) - .get_matches(); - let leader: NodeInfo; - if let Some(l) = matches.value_of("leader") { - leader = read_leader(l).node_info; - } else { - let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); - leader = NodeInfo::new_leader(&server_addr); - }; - - let id_path: String = matches - .value_of("keypair") - .expect("Unable to parse keypair path") - .to_string(); - - let mut io = MetaIoHandler::default(); - let rpc = RpcSolImpl; - io.extend_with(rpc.to_delegate()); - - let rpc_addr = format!("0.0.0.0:{}", RPC_PORT); - let server = ServerBuilder::new(io) - .meta_extractor(move |_req: &hyper::Request| Meta { - leader: Some(leader.clone()), - keypair_location: Some(id_path.clone()), - }) - .threads(4) - .cors(DomainsValidation::AllowOnly(vec![ - AccessControlAllowOrigin::Any, - ])) - .start_http( - &rpc_addr - .parse() - .expect("Unable to parse RPC server address"), - )?; - - server.wait(); - Ok(()) -} - -fn read_leader(path: &str) -> Config { - let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path)); - serde_json::from_reader(file).unwrap_or_else(|_| panic!("failed to parse {}", path)) -} diff --git a/src/fullnode.rs b/src/fullnode.rs index 6f6653f35..ccdfcb530 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -7,6 +7,7 @@ use entry::Entry; use ledger::read_ledger; use ncp::Ncp; use packet::BlobRecycler; +use rpc::{JsonRpcService, RPC_PORT}; use rpu::Rpu; use service::Service; use signature::{Keypair, KeypairUtil}; @@ -190,7 +191,7 @@ impl Fullnode { let tick_duration = None; // TODO: To light up PoH, uncomment the following line: //let tick_duration = Some(Duration::from_millis(1000)); - + // let node_info = node.data.clone(); let bank = Arc::new(bank); let mut thread_hdls = vec![]; let rpu = Rpu::new( @@ -201,6 +202,11 @@ impl Fullnode { ); thread_hdls.extend(rpu.thread_hdls()); + let mut rpc_addr = node.data.contact_info.ncp; + rpc_addr.set_port(RPC_PORT); + let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr); + thread_hdls.extend(rpc_service.thread_hdls()); + let blob_recycler = BlobRecycler::default(); let window = window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler); @@ -292,6 +298,11 @@ impl Fullnode { ); thread_hdls.extend(rpu.thread_hdls()); + let mut rpc_addr = node.data.contact_info.ncp; + rpc_addr.set_port(RPC_PORT); + let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr); + thread_hdls.extend(rpc_service.thread_hdls()); + let blob_recycler = BlobRecycler::default(); let window = window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler); diff --git a/src/rpc.rs b/src/rpc.rs index 53a353374..1254e961b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,25 +1,70 @@ //! The `rpc` module implements the Solana rpc interface. +use bank::Bank; use bs58; -use client::mk_client; -use crdt::NodeInfo; use jsonrpc_core::*; -use signature::{read_keypair, KeypairUtil, Pubkey, Signature}; +use jsonrpc_http_server::*; +use request::{Request as JsonRpcRequest, Response}; +use service::Service; +use signature::{Pubkey, Signature}; use std::mem; +use std::net::SocketAddr; +use std::sync::Arc; +use std::thread::{self, Builder, JoinHandle}; pub const RPC_PORT: u16 = 8899; +pub struct JsonRpcService { + thread_hdl: JoinHandle<()>, +} + +impl JsonRpcService { + pub fn new(bank: Arc, rpc_addr: SocketAddr) -> Self { + let request_processor = JsonRpcRequestProcessor::new(bank); + let thread_hdl = Builder::new() + .name("solana-jsonrpc".to_string()) + .spawn(move || { + let mut io = MetaIoHandler::default(); + let rpc = RpcSolImpl; + io.extend_with(rpc.to_delegate()); + + let server = ServerBuilder::new(io) + .meta_extractor(move |_req: &hyper::Request| Meta { + request_processor: Some(request_processor.clone()), + }) + .threads(4) + .cors(DomainsValidation::AllowOnly(vec![ + AccessControlAllowOrigin::Any, + ])) + .start_http(&rpc_addr) + .unwrap(); + server.wait(); + () + }) + .unwrap(); + JsonRpcService { thread_hdl } + } +} + +impl Service for JsonRpcService { + fn thread_hdls(self) -> Vec> { + vec![self.thread_hdl] + } + + fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} + #[derive(Clone)] pub struct Meta { - pub leader: Option, - pub keypair_location: Option, + pub request_processor: Option, } impl Metadata for Meta {} impl Default for Meta { fn default() -> Self { Meta { - leader: None, - keypair_location: None, + request_processor: None, } } } @@ -28,14 +73,14 @@ build_rpc_trait! { pub trait RpcSol { type Metadata; - #[rpc(meta, name = "solana_getAddress")] - fn address(&self, Self::Metadata) -> Result; - #[rpc(meta, name = "solana_confirmTransaction")] fn confirm_transaction(&self, Self::Metadata, String) -> Result; #[rpc(meta, name = "solana_getBalance")] - fn get_balance(&self, Self::Metadata, String) -> Result; + fn get_balance(&self, Self::Metadata, String) -> Result<(String, i64)>; + + #[rpc(meta, name = "solana_getFinality")] + fn get_finality(&self, Self::Metadata) -> Result; #[rpc(meta, name = "solana_getLastId")] fn get_last_id(&self, Self::Metadata) -> Result; @@ -43,8 +88,8 @@ build_rpc_trait! { #[rpc(meta, name = "solana_getTransactionCount")] fn get_transaction_count(&self, Self::Metadata) -> Result; - #[rpc(meta, name = "solana_sendTransaction")] - fn send_transaction(&self, Self::Metadata, String, i64) -> Result; + // #[rpc(meta, name = "solana_sendTransaction")] + // fn send_transaction(&self, Self::Metadata, String, i64) -> Result; } } @@ -52,10 +97,6 @@ pub struct RpcSolImpl; impl RpcSol for RpcSolImpl { type Metadata = Meta; - fn address(&self, meta: Self::Metadata) -> Result { - let client_keypair = read_keypair(&meta.keypair_location.unwrap()); - Ok(bs58::encode(client_keypair.unwrap().pubkey()).into_string()) - } fn confirm_transaction(&self, meta: Self::Metadata, id: String) -> Result { let signature_vec = bs58::decode(id) .into_vec() @@ -65,14 +106,24 @@ impl RpcSol for RpcSolImpl { Err(Error::invalid_request()) } else { let signature = Signature::new(&signature_vec); - - let mut client = mk_client(&meta.leader.unwrap()); - - let confirmation = client.check_signature(&signature); - Ok(confirmation) + let req = JsonRpcRequest::GetSignature { signature }; + let resp = meta.request_processor.unwrap().process_request(req); + match resp { + Some(Response::SignatureStatus { signature_status }) => Ok(signature_status), + Some(_) => Err(Error { + code: ErrorCode::ServerError(-32002), + message: "Server error: bad response".to_string(), + data: None, + }), + None => Err(Error { + code: ErrorCode::ServerError(-32001), + message: "Server error: no node found".to_string(), + data: None, + }), + } } } - fn get_balance(&self, meta: Self::Metadata, id: String) -> Result { + fn get_balance(&self, meta: Self::Metadata, id: String) -> Result<(String, i64)> { let pubkey_vec = bs58::decode(id) .into_vec() .expect("base58-encoded public key"); @@ -81,13 +132,16 @@ impl RpcSol for RpcSolImpl { Err(Error::invalid_request()) } else { let pubkey = Pubkey::new(&pubkey_vec); - - let mut client = mk_client(&meta.leader.unwrap()); - - let balance = client.poll_get_balance(&pubkey); - match balance { - Ok(balance) => Ok(balance), - Err(_) => Err(Error { + let req = JsonRpcRequest::GetBalance { key: pubkey }; + let resp = meta.request_processor.unwrap().process_request(req); + match resp { + Some(Response::Balance { key, val }) => Ok((bs58::encode(key).into_string(), val)), + Some(_) => Err(Error { + code: ErrorCode::ServerError(-32002), + message: "Server error: bad response".to_string(), + data: None, + }), + None => Err(Error { code: ErrorCode::ServerError(-32001), message: "Server error: no node found".to_string(), data: None, @@ -95,32 +149,119 @@ impl RpcSol for RpcSolImpl { } } } + fn get_finality(&self, meta: Self::Metadata) -> Result { + let req = JsonRpcRequest::GetFinality; + let resp = meta.request_processor.unwrap().process_request(req); + match resp { + Some(Response::Finality { time }) => Ok(time), + Some(_) => Err(Error { + code: ErrorCode::ServerError(-32002), + message: "Server error: bad response".to_string(), + data: None, + }), + None => Err(Error { + code: ErrorCode::ServerError(-32001), + message: "Server error: no node found".to_string(), + data: None, + }), + } + } fn get_last_id(&self, meta: Self::Metadata) -> Result { - let mut client = mk_client(&meta.leader.unwrap()); - let last_id = client.get_last_id(); - Ok(bs58::encode(last_id).into_string()) + let req = JsonRpcRequest::GetLastId; + let resp = meta.request_processor.unwrap().process_request(req); + match resp { + Some(Response::LastId { id }) => Ok(bs58::encode(id).into_string()), + Some(_) => Err(Error { + code: ErrorCode::ServerError(-32002), + message: "Server error: bad response".to_string(), + data: None, + }), + None => Err(Error { + code: ErrorCode::ServerError(-32001), + message: "Server error: no node found".to_string(), + data: None, + }), + } } fn get_transaction_count(&self, meta: Self::Metadata) -> Result { - let mut client = mk_client(&meta.leader.unwrap()); - let tx_count = client.transaction_count(); - Ok(tx_count) + let req = JsonRpcRequest::GetTransactionCount; + let resp = meta.request_processor.unwrap().process_request(req); + match resp { + Some(Response::TransactionCount { transaction_count }) => Ok(transaction_count), + Some(_) => Err(Error { + code: ErrorCode::ServerError(-32002), + message: "Server error: bad response".to_string(), + data: None, + }), + None => Err(Error { + code: ErrorCode::ServerError(-32001), + message: "Server error: no node found".to_string(), + data: None, + }), + } + } + // fn send_transaction(&self, meta: Self::Metadata, to: String, tokens: i64) -> Result { + // let client_keypair = read_keypair(&meta.keypair_location.unwrap()).unwrap(); + // let mut client = mk_client(&meta.leader.unwrap()); + // let last_id = client.get_last_id(); + // let to_pubkey_vec = bs58::decode(to) + // .into_vec() + // .expect("base58-encoded public key"); + // + // if to_pubkey_vec.len() != mem::size_of::() { + // Err(Error::invalid_request()) + // } else { + // let to_pubkey = Pubkey::new(&to_pubkey_vec); + // let signature = client + // .transfer(tokens, &client_keypair, to_pubkey, &last_id) + // .unwrap(); + // Ok(bs58::encode(signature).into_string()) + // } + // } +} +#[derive(Clone)] +pub struct JsonRpcRequestProcessor { + bank: Arc, +} +impl JsonRpcRequestProcessor { + /// Create a new request processor that wraps the given Bank. + pub fn new(bank: Arc) -> Self { + JsonRpcRequestProcessor { bank } } - fn send_transaction(&self, meta: Self::Metadata, to: String, tokens: i64) -> Result { - let client_keypair = read_keypair(&meta.keypair_location.unwrap()).unwrap(); - let mut client = mk_client(&meta.leader.unwrap()); - let last_id = client.get_last_id(); - let to_pubkey_vec = bs58::decode(to) - .into_vec() - .expect("base58-encoded public key"); - if to_pubkey_vec.len() != mem::size_of::() { - Err(Error::invalid_request()) - } else { - let to_pubkey = Pubkey::new(&to_pubkey_vec); - let signature = client - .transfer(tokens, &client_keypair, to_pubkey, &last_id) - .unwrap(); - Ok(bs58::encode(signature).into_string()) + /// Process Request items sent via JSON-RPC. + fn process_request(&self, msg: JsonRpcRequest) -> Option { + match msg { + JsonRpcRequest::GetBalance { key } => { + let val = self.bank.get_balance(&key); + let rsp = Response::Balance { key, val }; + info!("Response::Balance {:?}", rsp); + Some(rsp) + } + JsonRpcRequest::GetLastId => { + let id = self.bank.last_id(); + let rsp = Response::LastId { id }; + info!("Response::LastId {:?}", rsp); + Some(rsp) + } + JsonRpcRequest::GetTransactionCount => { + let transaction_count = self.bank.transaction_count() as u64; + let rsp = Response::TransactionCount { transaction_count }; + info!("Response::TransactionCount {:?}", rsp); + Some(rsp) + } + JsonRpcRequest::GetSignature { signature } => { + let signature_status = self.bank.has_signature(&signature); + let rsp = Response::SignatureStatus { signature_status }; + info!("Response::Signature {:?}", rsp); + Some(rsp) + } + JsonRpcRequest::GetFinality => { + let time = self.bank.finality(); + let rsp = Response::Finality { time }; + info!("Response::Finality {:?}", rsp); + Some(rsp) + } } } } @@ -129,108 +270,56 @@ impl RpcSol for RpcSolImpl { mod tests { use super::*; use bank::Bank; - use crdt::{get_ip_addr, TestNode}; - use drone::{Drone, DroneRequest}; - use fullnode::FullNode; use jsonrpc_core::Response; - use ledger::LedgerWriter; use mint::Mint; - use service::Service; use signature::{Keypair, KeypairUtil}; - use std::fs::remove_dir_all; - use std::net::SocketAddr; - use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; - use std::thread::sleep; - use std::time::Duration; - - fn tmp_ledger(name: &str, mint: &Mint) -> String { - let keypair = Keypair::new(); - - let path = format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey()); - - let mut writer = LedgerWriter::open(&path, true).unwrap(); - writer.write_entries(mint.create_entries()).unwrap(); - - path - } + use transaction::Transaction; #[test] fn test_rpc_request() { - let leader_keypair = Keypair::new(); - let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); - let leader_data = leader.data.clone(); - let alice = Mint::new(10_000); - let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); - let exit = Arc::new(AtomicBool::new(false)); - let ledger_path = tmp_ledger("rpc_request", &alice); + let bank = Bank::new(&alice); - let server = FullNode::new_leader( - leader_keypair, - bank, - 0, - None, - Some(Duration::from_millis(30)), - leader, - exit.clone(), - &ledger_path, - false, - ); - sleep(Duration::from_millis(900)); + let last_id = bank.last_id(); + let tx = Transaction::new(&alice.keypair(), bob_pubkey, 20, last_id); + bank.process_transaction(&tx).expect("process transaction"); + + let request_processor = JsonRpcRequestProcessor::new(Arc::new(bank)); let mut io = MetaIoHandler::default(); let rpc = RpcSolImpl; io.extend_with(rpc.to_delegate()); - format!("0.0.0.0:{}", RPC_PORT); + let meta = Meta { + request_processor: Some(request_processor), + }; + let req = format!( r#"{{"jsonrpc":"2.0","id":1,"method":"solana_getBalance","params":["{}"]}}"#, bob_pubkey ); - let meta = Meta { - leader: Some(leader_data.clone()), - keypair_location: None, - }; - let res = io.handle_request_sync(&req, meta.clone()); - let expected = r#"{"jsonrpc":"2.0","result":0,"id":1}"#; + let expected = format!( + r#"{{"jsonrpc":"2.0","result":["{}", 20],"id":1}}"#, + bob_pubkey + ); let expected: Response = - serde_json::from_str(expected).expect("expected response deserialization"); + serde_json::from_str(&expected).expect("expected response deserialization"); let result: Response = serde_json::from_str(&res.expect("actual response")) .expect("actual response deserialization"); assert_eq!(expected, result); - let mut addr: SocketAddr = "0.0.0.0:9900".parse().expect("bind to drone socket"); - addr.set_ip(get_ip_addr().expect("drone get_ip_addr")); - let mut drone = Drone::new( - alice.keypair(), - addr, - leader_data.contact_info.tpu, - leader_data.contact_info.rpu, - None, - Some(150_000), - ); + let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"solana_getTransactionCount"}}"#); + let res = io.handle_request_sync(&req, meta.clone()); + let expected = format!(r#"{{"jsonrpc":"2.0","result":1,"id":1}}"#); + let expected: Response = + serde_json::from_str(&expected).expect("expected response deserialization"); - let bob_req = DroneRequest::GetAirdrop { - airdrop_request_amount: 50, - client_pubkey: bob_pubkey, - }; - drone.send_airdrop(bob_req).unwrap(); - - let res1 = io.handle_request_sync(&req, meta); - let expected1 = r#"{"jsonrpc":"2.0","result":50,"id":1}"#; - let expected1: Response = - serde_json::from_str(expected1).expect("expected response deserialization"); - - let result1: Response = serde_json::from_str(&res1.expect("actual response")) + let result: Response = serde_json::from_str(&res.expect("actual response")) .expect("actual response deserialization"); - assert_eq!(expected1, result1); - - exit.store(true, Ordering::Relaxed); - server.join().unwrap(); - remove_dir_all(ledger_path).unwrap(); + assert_eq!(expected, result); } #[test] fn test_rpc_request_bad_parameter_type() { @@ -239,8 +328,7 @@ mod tests { io.extend_with(rpc.to_delegate()); let req = r#"{"jsonrpc":"2.0","id":1,"method":"solana_getBalance","params":[1234567890]}"#; let meta = Meta { - leader: None, - keypair_location: None, + request_processor: None, }; let res = io.handle_request_sync(req, meta); @@ -253,15 +341,14 @@ mod tests { assert_eq!(expected, result); } #[test] - fn test_rpc_request_bad_pubkey() { + fn test_rpc_request_bad_signature() { let mut io = MetaIoHandler::default(); let rpc = RpcSolImpl; io.extend_with(rpc.to_delegate()); let req = - r#"{"jsonrpc":"2.0","id":1,"method":"solana_getBalance","params":["a1b2c3d4e5"]}"#; + r#"{"jsonrpc":"2.0","id":1,"method":"solana_confirmTransaction","params":["a1b2c3d4e5"]}"#; let meta = Meta { - leader: None, - keypair_location: None, + request_processor: None, }; let res = io.handle_request_sync(req, meta); @@ -274,5 +361,4 @@ mod tests { .expect("actual response deserialization"); assert_eq!(expected, result); } - }