Implement Rpc in Fullnode

This commit is contained in:
Tyera Eulberg 2018-08-14 18:03:48 -06:00 committed by Tyera Eulberg
parent d3c09b4e96
commit c6662a4512
4 changed files with 231 additions and 221 deletions

View File

@ -53,10 +53,6 @@ path = "src/bin/keygen.rs"
name = "solana-wallet"
path = "src/bin/wallet.rs"
[[bin]]
name = "solana-json-rpc"
path = "src/bin/json-rpc.rs"
[badges]
codecov = { repository = "solana-labs/solana", branch = "master", service = "github" }

View File

@ -1,83 +0,0 @@
//! The `json-rpc` service launches an HTTP server to listen for
//! Solana rpc requests.
#[macro_use]
extern crate clap;
extern crate dirs;
extern crate jsonrpc_core;
extern crate jsonrpc_http_server;
extern crate solana;
use clap::{App, Arg};
use jsonrpc_core::*;
use jsonrpc_http_server::*;
use solana::crdt::NodeInfo;
use solana::fullnode::Config;
use solana::rpc::*;
use std::error;
use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::result::Result;
fn main() -> Result<(), Box<error::Error>> {
let matches = App::new("json-rpc")
.version(crate_version!())
.arg(
Arg::with_name("leader")
.short("l")
.long("leader")
.value_name("PATH")
.takes_value(true)
.help("/path/to/leader.json"),
)
.arg(
Arg::with_name("keypair")
.short("k")
.long("keypair")
.value_name("PATH")
.takes_value(true)
.required(true)
.help("/path/to/id.json"),
)
.get_matches();
let leader: NodeInfo;
if let Some(l) = matches.value_of("leader") {
leader = read_leader(l).node_info;
} else {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
leader = NodeInfo::new_leader(&server_addr);
};
let id_path: String = matches
.value_of("keypair")
.expect("Unable to parse keypair path")
.to_string();
let mut io = MetaIoHandler::default();
let rpc = RpcSolImpl;
io.extend_with(rpc.to_delegate());
let rpc_addr = format!("0.0.0.0:{}", RPC_PORT);
let server = ServerBuilder::new(io)
.meta_extractor(move |_req: &hyper::Request| Meta {
leader: Some(leader.clone()),
keypair_location: Some(id_path.clone()),
})
.threads(4)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Any,
]))
.start_http(
&rpc_addr
.parse()
.expect("Unable to parse RPC server address"),
)?;
server.wait();
Ok(())
}
fn read_leader(path: &str) -> Config {
let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path));
serde_json::from_reader(file).unwrap_or_else(|_| panic!("failed to parse {}", path))
}

View File

@ -7,6 +7,7 @@ use entry::Entry;
use ledger::read_ledger;
use ncp::Ncp;
use packet::BlobRecycler;
use rpc::{JsonRpcService, RPC_PORT};
use rpu::Rpu;
use service::Service;
use signature::{Keypair, KeypairUtil};
@ -190,7 +191,7 @@ impl Fullnode {
let tick_duration = None;
// TODO: To light up PoH, uncomment the following line:
//let tick_duration = Some(Duration::from_millis(1000));
// let node_info = node.data.clone();
let bank = Arc::new(bank);
let mut thread_hdls = vec![];
let rpu = Rpu::new(
@ -201,6 +202,11 @@ impl Fullnode {
);
thread_hdls.extend(rpu.thread_hdls());
let mut rpc_addr = node.data.contact_info.ncp;
rpc_addr.set_port(RPC_PORT);
let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr);
thread_hdls.extend(rpc_service.thread_hdls());
let blob_recycler = BlobRecycler::default();
let window =
window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler);
@ -292,6 +298,11 @@ impl Fullnode {
);
thread_hdls.extend(rpu.thread_hdls());
let mut rpc_addr = node.data.contact_info.ncp;
rpc_addr.set_port(RPC_PORT);
let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr);
thread_hdls.extend(rpc_service.thread_hdls());
let blob_recycler = BlobRecycler::default();
let window =
window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler);

View File

@ -1,25 +1,70 @@
//! The `rpc` module implements the Solana rpc interface.
use bank::Bank;
use bs58;
use client::mk_client;
use crdt::NodeInfo;
use jsonrpc_core::*;
use signature::{read_keypair, KeypairUtil, Pubkey, Signature};
use jsonrpc_http_server::*;
use request::{Request as JsonRpcRequest, Response};
use service::Service;
use signature::{Pubkey, Signature};
use std::mem;
use std::net::SocketAddr;
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};
pub const RPC_PORT: u16 = 8899;
pub struct JsonRpcService {
thread_hdl: JoinHandle<()>,
}
impl JsonRpcService {
pub fn new(bank: Arc<Bank>, rpc_addr: SocketAddr) -> Self {
let request_processor = JsonRpcRequestProcessor::new(bank);
let thread_hdl = Builder::new()
.name("solana-jsonrpc".to_string())
.spawn(move || {
let mut io = MetaIoHandler::default();
let rpc = RpcSolImpl;
io.extend_with(rpc.to_delegate());
let server = ServerBuilder::new(io)
.meta_extractor(move |_req: &hyper::Request| Meta {
request_processor: Some(request_processor.clone()),
})
.threads(4)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Any,
]))
.start_http(&rpc_addr)
.unwrap();
server.wait();
()
})
.unwrap();
JsonRpcService { thread_hdl }
}
}
impl Service for JsonRpcService {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
#[derive(Clone)]
pub struct Meta {
pub leader: Option<NodeInfo>,
pub keypair_location: Option<String>,
pub request_processor: Option<JsonRpcRequestProcessor>,
}
impl Metadata for Meta {}
impl Default for Meta {
fn default() -> Self {
Meta {
leader: None,
keypair_location: None,
request_processor: None,
}
}
}
@ -28,14 +73,14 @@ build_rpc_trait! {
pub trait RpcSol {
type Metadata;
#[rpc(meta, name = "solana_getAddress")]
fn address(&self, Self::Metadata) -> Result<String>;
#[rpc(meta, name = "solana_confirmTransaction")]
fn confirm_transaction(&self, Self::Metadata, String) -> Result<bool>;
#[rpc(meta, name = "solana_getBalance")]
fn get_balance(&self, Self::Metadata, String) -> Result<i64>;
fn get_balance(&self, Self::Metadata, String) -> Result<(String, i64)>;
#[rpc(meta, name = "solana_getFinality")]
fn get_finality(&self, Self::Metadata) -> Result<usize>;
#[rpc(meta, name = "solana_getLastId")]
fn get_last_id(&self, Self::Metadata) -> Result<String>;
@ -43,8 +88,8 @@ build_rpc_trait! {
#[rpc(meta, name = "solana_getTransactionCount")]
fn get_transaction_count(&self, Self::Metadata) -> Result<u64>;
#[rpc(meta, name = "solana_sendTransaction")]
fn send_transaction(&self, Self::Metadata, String, i64) -> Result<String>;
// #[rpc(meta, name = "solana_sendTransaction")]
// fn send_transaction(&self, Self::Metadata, String, i64) -> Result<String>;
}
}
@ -52,10 +97,6 @@ pub struct RpcSolImpl;
impl RpcSol for RpcSolImpl {
type Metadata = Meta;
fn address(&self, meta: Self::Metadata) -> Result<String> {
let client_keypair = read_keypair(&meta.keypair_location.unwrap());
Ok(bs58::encode(client_keypair.unwrap().pubkey()).into_string())
}
fn confirm_transaction(&self, meta: Self::Metadata, id: String) -> Result<bool> {
let signature_vec = bs58::decode(id)
.into_vec()
@ -65,14 +106,24 @@ impl RpcSol for RpcSolImpl {
Err(Error::invalid_request())
} else {
let signature = Signature::new(&signature_vec);
let mut client = mk_client(&meta.leader.unwrap());
let confirmation = client.check_signature(&signature);
Ok(confirmation)
let req = JsonRpcRequest::GetSignature { signature };
let resp = meta.request_processor.unwrap().process_request(req);
match resp {
Some(Response::SignatureStatus { signature_status }) => Ok(signature_status),
Some(_) => Err(Error {
code: ErrorCode::ServerError(-32002),
message: "Server error: bad response".to_string(),
data: None,
}),
None => Err(Error {
code: ErrorCode::ServerError(-32001),
message: "Server error: no node found".to_string(),
data: None,
}),
}
}
}
fn get_balance(&self, meta: Self::Metadata, id: String) -> Result<i64> {
fn get_balance(&self, meta: Self::Metadata, id: String) -> Result<(String, i64)> {
let pubkey_vec = bs58::decode(id)
.into_vec()
.expect("base58-encoded public key");
@ -81,13 +132,16 @@ impl RpcSol for RpcSolImpl {
Err(Error::invalid_request())
} else {
let pubkey = Pubkey::new(&pubkey_vec);
let mut client = mk_client(&meta.leader.unwrap());
let balance = client.poll_get_balance(&pubkey);
match balance {
Ok(balance) => Ok(balance),
Err(_) => Err(Error {
let req = JsonRpcRequest::GetBalance { key: pubkey };
let resp = meta.request_processor.unwrap().process_request(req);
match resp {
Some(Response::Balance { key, val }) => Ok((bs58::encode(key).into_string(), val)),
Some(_) => Err(Error {
code: ErrorCode::ServerError(-32002),
message: "Server error: bad response".to_string(),
data: None,
}),
None => Err(Error {
code: ErrorCode::ServerError(-32001),
message: "Server error: no node found".to_string(),
data: None,
@ -95,32 +149,119 @@ impl RpcSol for RpcSolImpl {
}
}
}
fn get_finality(&self, meta: Self::Metadata) -> Result<usize> {
let req = JsonRpcRequest::GetFinality;
let resp = meta.request_processor.unwrap().process_request(req);
match resp {
Some(Response::Finality { time }) => Ok(time),
Some(_) => Err(Error {
code: ErrorCode::ServerError(-32002),
message: "Server error: bad response".to_string(),
data: None,
}),
None => Err(Error {
code: ErrorCode::ServerError(-32001),
message: "Server error: no node found".to_string(),
data: None,
}),
}
}
fn get_last_id(&self, meta: Self::Metadata) -> Result<String> {
let mut client = mk_client(&meta.leader.unwrap());
let last_id = client.get_last_id();
Ok(bs58::encode(last_id).into_string())
let req = JsonRpcRequest::GetLastId;
let resp = meta.request_processor.unwrap().process_request(req);
match resp {
Some(Response::LastId { id }) => Ok(bs58::encode(id).into_string()),
Some(_) => Err(Error {
code: ErrorCode::ServerError(-32002),
message: "Server error: bad response".to_string(),
data: None,
}),
None => Err(Error {
code: ErrorCode::ServerError(-32001),
message: "Server error: no node found".to_string(),
data: None,
}),
}
}
fn get_transaction_count(&self, meta: Self::Metadata) -> Result<u64> {
let mut client = mk_client(&meta.leader.unwrap());
let tx_count = client.transaction_count();
Ok(tx_count)
let req = JsonRpcRequest::GetTransactionCount;
let resp = meta.request_processor.unwrap().process_request(req);
match resp {
Some(Response::TransactionCount { transaction_count }) => Ok(transaction_count),
Some(_) => Err(Error {
code: ErrorCode::ServerError(-32002),
message: "Server error: bad response".to_string(),
data: None,
}),
None => Err(Error {
code: ErrorCode::ServerError(-32001),
message: "Server error: no node found".to_string(),
data: None,
}),
}
}
// fn send_transaction(&self, meta: Self::Metadata, to: String, tokens: i64) -> Result<String> {
// let client_keypair = read_keypair(&meta.keypair_location.unwrap()).unwrap();
// let mut client = mk_client(&meta.leader.unwrap());
// let last_id = client.get_last_id();
// let to_pubkey_vec = bs58::decode(to)
// .into_vec()
// .expect("base58-encoded public key");
//
// if to_pubkey_vec.len() != mem::size_of::<Pubkey>() {
// Err(Error::invalid_request())
// } else {
// let to_pubkey = Pubkey::new(&to_pubkey_vec);
// let signature = client
// .transfer(tokens, &client_keypair, to_pubkey, &last_id)
// .unwrap();
// Ok(bs58::encode(signature).into_string())
// }
// }
}
#[derive(Clone)]
pub struct JsonRpcRequestProcessor {
bank: Arc<Bank>,
}
impl JsonRpcRequestProcessor {
/// Create a new request processor that wraps the given Bank.
pub fn new(bank: Arc<Bank>) -> Self {
JsonRpcRequestProcessor { bank }
}
fn send_transaction(&self, meta: Self::Metadata, to: String, tokens: i64) -> Result<String> {
let client_keypair = read_keypair(&meta.keypair_location.unwrap()).unwrap();
let mut client = mk_client(&meta.leader.unwrap());
let last_id = client.get_last_id();
let to_pubkey_vec = bs58::decode(to)
.into_vec()
.expect("base58-encoded public key");
if to_pubkey_vec.len() != mem::size_of::<Pubkey>() {
Err(Error::invalid_request())
} else {
let to_pubkey = Pubkey::new(&to_pubkey_vec);
let signature = client
.transfer(tokens, &client_keypair, to_pubkey, &last_id)
.unwrap();
Ok(bs58::encode(signature).into_string())
/// Process Request items sent via JSON-RPC.
fn process_request(&self, msg: JsonRpcRequest) -> Option<Response> {
match msg {
JsonRpcRequest::GetBalance { key } => {
let val = self.bank.get_balance(&key);
let rsp = Response::Balance { key, val };
info!("Response::Balance {:?}", rsp);
Some(rsp)
}
JsonRpcRequest::GetLastId => {
let id = self.bank.last_id();
let rsp = Response::LastId { id };
info!("Response::LastId {:?}", rsp);
Some(rsp)
}
JsonRpcRequest::GetTransactionCount => {
let transaction_count = self.bank.transaction_count() as u64;
let rsp = Response::TransactionCount { transaction_count };
info!("Response::TransactionCount {:?}", rsp);
Some(rsp)
}
JsonRpcRequest::GetSignature { signature } => {
let signature_status = self.bank.has_signature(&signature);
let rsp = Response::SignatureStatus { signature_status };
info!("Response::Signature {:?}", rsp);
Some(rsp)
}
JsonRpcRequest::GetFinality => {
let time = self.bank.finality();
let rsp = Response::Finality { time };
info!("Response::Finality {:?}", rsp);
Some(rsp)
}
}
}
}
@ -129,108 +270,56 @@ impl RpcSol for RpcSolImpl {
mod tests {
use super::*;
use bank::Bank;
use crdt::{get_ip_addr, TestNode};
use drone::{Drone, DroneRequest};
use fullnode::FullNode;
use jsonrpc_core::Response;
use ledger::LedgerWriter;
use mint::Mint;
use service::Service;
use signature::{Keypair, KeypairUtil};
use std::fs::remove_dir_all;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
fn tmp_ledger(name: &str, mint: &Mint) -> String {
let keypair = Keypair::new();
let path = format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey());
let mut writer = LedgerWriter::open(&path, true).unwrap();
writer.write_entries(mint.create_entries()).unwrap();
path
}
use transaction::Transaction;
#[test]
fn test_rpc_request() {
let leader_keypair = Keypair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let bob_pubkey = Keypair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let ledger_path = tmp_ledger("rpc_request", &alice);
let bank = Bank::new(&alice);
let server = FullNode::new_leader(
leader_keypair,
bank,
0,
None,
Some(Duration::from_millis(30)),
leader,
exit.clone(),
&ledger_path,
false,
);
sleep(Duration::from_millis(900));
let last_id = bank.last_id();
let tx = Transaction::new(&alice.keypair(), bob_pubkey, 20, last_id);
bank.process_transaction(&tx).expect("process transaction");
let request_processor = JsonRpcRequestProcessor::new(Arc::new(bank));
let mut io = MetaIoHandler::default();
let rpc = RpcSolImpl;
io.extend_with(rpc.to_delegate());
format!("0.0.0.0:{}", RPC_PORT);
let meta = Meta {
request_processor: Some(request_processor),
};
let req = format!(
r#"{{"jsonrpc":"2.0","id":1,"method":"solana_getBalance","params":["{}"]}}"#,
bob_pubkey
);
let meta = Meta {
leader: Some(leader_data.clone()),
keypair_location: None,
};
let res = io.handle_request_sync(&req, meta.clone());
let expected = r#"{"jsonrpc":"2.0","result":0,"id":1}"#;
let expected = format!(
r#"{{"jsonrpc":"2.0","result":["{}", 20],"id":1}}"#,
bob_pubkey
);
let expected: Response =
serde_json::from_str(expected).expect("expected response deserialization");
serde_json::from_str(&expected).expect("expected response deserialization");
let result: Response = serde_json::from_str(&res.expect("actual response"))
.expect("actual response deserialization");
assert_eq!(expected, result);
let mut addr: SocketAddr = "0.0.0.0:9900".parse().expect("bind to drone socket");
addr.set_ip(get_ip_addr().expect("drone get_ip_addr"));
let mut drone = Drone::new(
alice.keypair(),
addr,
leader_data.contact_info.tpu,
leader_data.contact_info.rpu,
None,
Some(150_000),
);
let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"solana_getTransactionCount"}}"#);
let res = io.handle_request_sync(&req, meta.clone());
let expected = format!(r#"{{"jsonrpc":"2.0","result":1,"id":1}}"#);
let expected: Response =
serde_json::from_str(&expected).expect("expected response deserialization");
let bob_req = DroneRequest::GetAirdrop {
airdrop_request_amount: 50,
client_pubkey: bob_pubkey,
};
drone.send_airdrop(bob_req).unwrap();
let res1 = io.handle_request_sync(&req, meta);
let expected1 = r#"{"jsonrpc":"2.0","result":50,"id":1}"#;
let expected1: Response =
serde_json::from_str(expected1).expect("expected response deserialization");
let result1: Response = serde_json::from_str(&res1.expect("actual response"))
let result: Response = serde_json::from_str(&res.expect("actual response"))
.expect("actual response deserialization");
assert_eq!(expected1, result1);
exit.store(true, Ordering::Relaxed);
server.join().unwrap();
remove_dir_all(ledger_path).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_rpc_request_bad_parameter_type() {
@ -239,8 +328,7 @@ mod tests {
io.extend_with(rpc.to_delegate());
let req = r#"{"jsonrpc":"2.0","id":1,"method":"solana_getBalance","params":[1234567890]}"#;
let meta = Meta {
leader: None,
keypair_location: None,
request_processor: None,
};
let res = io.handle_request_sync(req, meta);
@ -253,15 +341,14 @@ mod tests {
assert_eq!(expected, result);
}
#[test]
fn test_rpc_request_bad_pubkey() {
fn test_rpc_request_bad_signature() {
let mut io = MetaIoHandler::default();
let rpc = RpcSolImpl;
io.extend_with(rpc.to_delegate());
let req =
r#"{"jsonrpc":"2.0","id":1,"method":"solana_getBalance","params":["a1b2c3d4e5"]}"#;
r#"{"jsonrpc":"2.0","id":1,"method":"solana_confirmTransaction","params":["a1b2c3d4e5"]}"#;
let meta = Meta {
leader: None,
keypair_location: None,
request_processor: None,
};
let res = io.handle_request_sync(req, meta);
@ -274,5 +361,4 @@ mod tests {
.expect("actual response deserialization");
assert_eq!(expected, result);
}
}