diff --git a/client/Cargo.toml b/client/Cargo.toml new file mode 100644 index 000000000..8f8e1a17c --- /dev/null +++ b/client/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "solana-client" +version = "0.13.0" +description = "Solana Client" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +homepage = "https://solana.com/" +license = "Apache-2.0" +edition = "2018" + +[dependencies] +bincode = "1.1.2" +bs58 = "0.2.0" +log = "0.4.2" +reqwest = "0.9.11" +serde_json = "1.0.39" +solana-metrics = { path = "../metrics", version = "0.13.0" } +solana-sdk = { path = "../sdk", version = "0.13.0" } + +[dev-dependencies] +jsonrpc-core = "10.1.0" +jsonrpc-http-server = "10.1.0" +solana-logger = { path = "../logger", version = "0.13.0" } diff --git a/client/src/lib.rs b/client/src/lib.rs new file mode 100644 index 000000000..257f7b068 --- /dev/null +++ b/client/src/lib.rs @@ -0,0 +1,3 @@ +pub mod rpc_mock; +pub mod rpc_request; +pub mod thin_client; diff --git a/client/src/rpc_mock.rs b/client/src/rpc_mock.rs new file mode 100644 index 000000000..ba18f3b61 --- /dev/null +++ b/client/src/rpc_mock.rs @@ -0,0 +1,111 @@ +// Implementation of RpcRequestHandler trait for testing Rpc requests without i/o + +use crate::rpc_request::{RpcRequest, RpcRequestHandler}; +use serde_json::{json, Number, Value}; +use solana_sdk::hash::Hash; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::{Keypair, KeypairUtil}; +use solana_sdk::system_transaction::SystemTransaction; +use solana_sdk::transaction::Transaction; +use std::error; +use std::io::{Error, ErrorKind}; +use std::net::SocketAddr; + +pub const PUBKEY: &str = "7RoSF9fUmdphVCpabEoefH81WwrW7orsWonXWqTXkKV8"; +pub const SIGNATURE: &str = + "43yNSFC6fYTuPgTNFFhF4axw7AfWxB2BPdurme8yrsWEYwm8299xh8n6TAHjGymiSub1XtyxTNyd9GBfY2hxoBw8"; + +#[derive(Clone)] +pub struct MockRpcClient { + pub addr: String, +} + +impl MockRpcClient { + pub fn new(addr: String) -> Self { + MockRpcClient { addr } + } + + pub fn retry_get_balance( + &self, + id: u64, + pubkey: &Pubkey, + retries: usize, + ) -> Result, Box> { + let params = json!([format!("{}", pubkey)]); + let res = self + .retry_make_rpc_request(id, &RpcRequest::GetBalance, Some(params), retries)? + .as_u64(); + Ok(res) + } + + pub fn retry_make_rpc_request( + &self, + _id: u64, + request: &RpcRequest, + params: Option, + mut _retries: usize, + ) -> Result> { + if self.addr == "fails" { + return Ok(Value::Null); + } + let val = match request { + RpcRequest::ConfirmTransaction => { + if let Some(Value::Array(param_array)) = params { + if let Value::String(param_string) = ¶m_array[0] { + Value::Bool(param_string == SIGNATURE) + } else { + Value::Null + } + } else { + Value::Null + } + } + RpcRequest::GetBalance => { + let n = if self.addr == "airdrop" { 0 } else { 50 }; + Value::Number(Number::from(n)) + } + RpcRequest::GetRecentBlockhash => Value::String(PUBKEY.to_string()), + RpcRequest::GetSignatureStatus => { + let str = if self.addr == "account_in_use" { + "AccountInUse" + } else if self.addr == "bad_sig_status" { + "Nonexistent" + } else { + "Confirmed" + }; + Value::String(str.to_string()) + } + RpcRequest::GetTransactionCount => Value::Number(Number::from(1234)), + RpcRequest::SendTransaction => Value::String(SIGNATURE.to_string()), + _ => Value::Null, + }; + Ok(val) + } +} + +impl RpcRequestHandler for MockRpcClient { + fn make_rpc_request( + &self, + id: u64, + request: RpcRequest, + params: Option, + ) -> Result> { + self.retry_make_rpc_request(id, &request, params, 0) + } +} + +pub fn request_airdrop_transaction( + _drone_addr: &SocketAddr, + _id: &Pubkey, + lamports: u64, + _blockhash: Hash, +) -> Result { + if lamports == 0 { + Err(Error::new(ErrorKind::Other, "Airdrop failed"))? + } + let key = Keypair::new(); + let to = Keypair::new().pubkey(); + let blockhash = Hash::default(); + let tx = SystemTransaction::new_account(&key, &to, lamports, blockhash, 0); + Ok(tx) +} diff --git a/client/src/rpc_request.rs b/client/src/rpc_request.rs new file mode 100644 index 000000000..3690f6469 --- /dev/null +++ b/client/src/rpc_request.rs @@ -0,0 +1,331 @@ +use log::*; +use reqwest; +use reqwest::header::CONTENT_TYPE; +use serde_json::{json, Value}; +use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, NUM_TICKS_PER_SECOND}; +use std::net::SocketAddr; +use std::thread::sleep; +use std::time::Duration; +use std::{error, fmt}; + +use solana_sdk::pubkey::Pubkey; + +#[derive(Clone)] +pub struct RpcClient { + pub client: reqwest::Client, + pub addr: String, +} + +impl RpcClient { + pub fn new(addr: String) -> Self { + RpcClient { + client: reqwest::Client::new(), + addr, + } + } + + pub fn new_with_timeout(addr: SocketAddr, timeout: Duration) -> Self { + let addr = get_rpc_request_str(addr, false); + let client = reqwest::Client::builder() + .timeout(timeout) + .build() + .expect("build rpc client"); + RpcClient { client, addr } + } + + pub fn new_from_socket(addr: SocketAddr) -> Self { + Self::new(get_rpc_request_str(addr, false)) + } + + pub fn retry_get_balance( + &self, + id: u64, + pubkey: &Pubkey, + retries: usize, + ) -> Result, Box> { + let params = json!([format!("{}", pubkey)]); + let res = self + .retry_make_rpc_request(id, &RpcRequest::GetBalance, Some(params), retries)? + .as_u64(); + Ok(res) + } + + pub fn retry_make_rpc_request( + &self, + id: u64, + request: &RpcRequest, + params: Option, + mut retries: usize, + ) -> Result> { + let request_json = request.build_request_json(id, params); + + loop { + match self + .client + .post(&self.addr) + .header(CONTENT_TYPE, "application/json") + .body(request_json.to_string()) + .send() + { + Ok(mut response) => { + let json: Value = serde_json::from_str(&response.text()?)?; + if json["error"].is_object() { + Err(RpcError::RpcRequestError(format!( + "RPC Error response: {}", + serde_json::to_string(&json["error"]).unwrap() + )))? + } + return Ok(json["result"].clone()); + } + Err(e) => { + info!( + "make_rpc_request() failed, {} retries left: {:?}", + retries, e + ); + if retries == 0 { + Err(e)?; + } + retries -= 1; + + // Sleep for approximately half a slot + sleep(Duration::from_millis( + 500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND, + )); + } + } + } + } +} + +pub fn get_rpc_request_str(rpc_addr: SocketAddr, tls: bool) -> String { + if tls { + format!("https://{}", rpc_addr) + } else { + format!("http://{}", rpc_addr) + } +} + +pub trait RpcRequestHandler { + fn make_rpc_request( + &self, + id: u64, + request: RpcRequest, + params: Option, + ) -> Result>; +} + +impl RpcRequestHandler for RpcClient { + fn make_rpc_request( + &self, + id: u64, + request: RpcRequest, + params: Option, + ) -> Result> { + self.retry_make_rpc_request(id, &request, params, 0) + } +} + +#[derive(Debug, PartialEq)] +pub enum RpcRequest { + ConfirmTransaction, + GetAccountInfo, + GetBalance, + GetRecentBlockhash, + GetSignatureStatus, + GetTransactionCount, + RequestAirdrop, + SendTransaction, + RegisterNode, + SignVote, + DeregisterNode, + GetStorageBlockhash, + GetStorageEntryHeight, + GetStoragePubkeysForEntryHeight, + FullnodeExit, +} + +impl RpcRequest { + fn build_request_json(&self, id: u64, params: Option) -> Value { + let jsonrpc = "2.0"; + let method = match self { + RpcRequest::ConfirmTransaction => "confirmTransaction", + RpcRequest::GetAccountInfo => "getAccountInfo", + RpcRequest::GetBalance => "getBalance", + RpcRequest::GetRecentBlockhash => "getRecentBlockhash", + RpcRequest::GetSignatureStatus => "getSignatureStatus", + RpcRequest::GetTransactionCount => "getTransactionCount", + RpcRequest::RequestAirdrop => "requestAirdrop", + RpcRequest::SendTransaction => "sendTransaction", + RpcRequest::RegisterNode => "registerNode", + RpcRequest::SignVote => "signVote", + RpcRequest::DeregisterNode => "deregisterNode", + RpcRequest::GetStorageBlockhash => "getStorageBlockhash", + RpcRequest::GetStorageEntryHeight => "getStorageEntryHeight", + RpcRequest::GetStoragePubkeysForEntryHeight => "getStoragePubkeysForEntryHeight", + RpcRequest::FullnodeExit => "fullnodeExit", + }; + let mut request = json!({ + "jsonrpc": jsonrpc, + "id": id, + "method": method, + }); + if let Some(param_string) = params { + request["params"] = param_string; + } + request + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum RpcError { + RpcRequestError(String), +} + +impl fmt::Display for RpcError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "invalid") + } +} + +impl error::Error for RpcError { + fn description(&self) -> &str { + "invalid" + } + + fn cause(&self) -> Option<&dyn error::Error> { + // Generic error, underlying cause isn't tracked. + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + use jsonrpc_core::{Error, IoHandler, Params}; + use jsonrpc_http_server::{AccessControlAllowOrigin, DomainsValidation, ServerBuilder}; + use serde_json::Number; + use solana_logger; + use std::sync::mpsc::channel; + use std::thread; + + #[test] + fn test_build_request_json() { + let test_request = RpcRequest::GetAccountInfo; + let addr = json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx"]); + let request = test_request.build_request_json(1, Some(addr.clone())); + assert_eq!(request["method"], "getAccountInfo"); + assert_eq!(request["params"], addr,); + + let test_request = RpcRequest::GetBalance; + let request = test_request.build_request_json(1, Some(addr)); + assert_eq!(request["method"], "getBalance"); + + let test_request = RpcRequest::GetRecentBlockhash; + let request = test_request.build_request_json(1, None); + assert_eq!(request["method"], "getRecentBlockhash"); + + let test_request = RpcRequest::GetTransactionCount; + let request = test_request.build_request_json(1, None); + assert_eq!(request["method"], "getTransactionCount"); + + let test_request = RpcRequest::RequestAirdrop; + let request = test_request.build_request_json(1, None); + assert_eq!(request["method"], "requestAirdrop"); + + let test_request = RpcRequest::SendTransaction; + let request = test_request.build_request_json(1, None); + assert_eq!(request["method"], "sendTransaction"); + } + #[test] + fn test_make_rpc_request() { + let (sender, receiver) = channel(); + thread::spawn(move || { + let rpc_addr = "0.0.0.0:0".parse().unwrap(); + let mut io = IoHandler::default(); + // Successful request + io.add_method("getBalance", |_params: Params| { + Ok(Value::Number(Number::from(50))) + }); + // Failed request + io.add_method("getRecentBlockhash", |params: Params| { + if params != Params::None { + Err(Error::invalid_request()) + } else { + Ok(Value::String( + "deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx".to_string(), + )) + } + }); + + let server = ServerBuilder::new(io) + .threads(1) + .cors(DomainsValidation::AllowOnly(vec![ + AccessControlAllowOrigin::Any, + ])) + .start_http(&rpc_addr) + .expect("Unable to start RPC server"); + sender.send(*server.address()).unwrap(); + server.wait(); + }); + + let rpc_addr = receiver.recv().unwrap(); + let rpc_client = RpcClient::new_from_socket(rpc_addr); + + let balance = rpc_client.make_rpc_request( + 1, + RpcRequest::GetBalance, + Some(json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx"])), + ); + assert_eq!(balance.unwrap().as_u64().unwrap(), 50); + + let blockhash = rpc_client.make_rpc_request(2, RpcRequest::GetRecentBlockhash, None); + assert_eq!( + blockhash.unwrap().as_str().unwrap(), + "deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx" + ); + + // Send erroneous parameter + let blockhash = + rpc_client.make_rpc_request(3, RpcRequest::GetRecentBlockhash, Some(json!("paramter"))); + assert_eq!(blockhash.is_err(), true); + } + + #[test] + fn test_retry_make_rpc_request() { + solana_logger::setup(); + let (sender, receiver) = channel(); + thread::spawn(move || { + // 1. Pick a random port + // 2. Tell the client to start using it + // 3. Delay for 1.5 seconds before starting the server to ensure the client will fail + // and need to retry + let rpc_addr: SocketAddr = "0.0.0.0:4242".parse().unwrap(); + sender.send(rpc_addr.clone()).unwrap(); + sleep(Duration::from_millis(1500)); + + let mut io = IoHandler::default(); + io.add_method("getBalance", move |_params: Params| { + Ok(Value::Number(Number::from(5))) + }); + let server = ServerBuilder::new(io) + .threads(1) + .cors(DomainsValidation::AllowOnly(vec![ + AccessControlAllowOrigin::Any, + ])) + .start_http(&rpc_addr) + .expect("Unable to start RPC server"); + server.wait(); + }); + + let rpc_addr = receiver.recv().unwrap(); + let rpc_client = RpcClient::new_from_socket(rpc_addr); + + let balance = rpc_client.retry_make_rpc_request( + 1, + &RpcRequest::GetBalance, + Some(json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhw"])), + 10, + ); + assert_eq!(balance.unwrap().as_u64().unwrap(), 5); + } +} diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs new file mode 100644 index 000000000..80d61a8c3 --- /dev/null +++ b/client/src/thin_client.rs @@ -0,0 +1,411 @@ +//! The `thin_client` module is a client-side object that interfaces with +//! a server-side TPU. Client code should use this object instead of writing +//! messages to the network directly. The binary encoding of its messages are +//! unstable and may change in future releases. + +use crate::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler}; +use bincode::serialize_into; +use bs58; +use log::*; +use serde_json::json; +use solana_metrics; +use solana_metrics::influxdb; +use solana_sdk::account::Account; +use solana_sdk::hash::Hash; +use solana_sdk::packet::PACKET_DATA_SIZE; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; +use solana_sdk::system_transaction::SystemTransaction; +use solana_sdk::timing; +use solana_sdk::transaction::Transaction; +use std; +use std::io; +use std::net::{SocketAddr, UdpSocket}; +use std::thread::sleep; +use std::time::Duration; +use std::time::Instant; + +/// An object for querying and sending transactions to the network. +pub struct ThinClient { + rpc_addr: SocketAddr, + transactions_addr: SocketAddr, + transactions_socket: UdpSocket, + rpc_client: RpcClient, +} + +impl ThinClient { + /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP + /// and the Tpu at `transactions_addr` over `transactions_socket` using UDP. + pub fn new( + rpc_addr: SocketAddr, + transactions_addr: SocketAddr, + transactions_socket: UdpSocket, + ) -> Self { + Self::new_from_client( + rpc_addr, + transactions_addr, + transactions_socket, + RpcClient::new_from_socket(rpc_addr), + ) + } + + pub fn new_with_timeout( + rpc_addr: SocketAddr, + transactions_addr: SocketAddr, + transactions_socket: UdpSocket, + timeout: Duration, + ) -> Self { + let rpc_client = RpcClient::new_with_timeout(rpc_addr, timeout); + Self::new_from_client(rpc_addr, transactions_addr, transactions_socket, rpc_client) + } + + fn new_from_client( + rpc_addr: SocketAddr, + transactions_addr: SocketAddr, + transactions_socket: UdpSocket, + rpc_client: RpcClient, + ) -> Self { + ThinClient { + rpc_client, + rpc_addr, + transactions_addr, + transactions_socket, + } + } + + /// Send a signed Transaction to the server for processing. This method + /// does not wait for a response. + pub fn transfer_signed(&self, transaction: &Transaction) -> io::Result { + let mut buf = vec![0; transaction.serialized_size().unwrap() as usize]; + let mut wr = std::io::Cursor::new(&mut buf[..]); + serialize_into(&mut wr, &transaction) + .expect("serialize Transaction in pub fn transfer_signed"); + assert!(buf.len() < PACKET_DATA_SIZE); + self.transactions_socket + .send_to(&buf[..], &self.transactions_addr)?; + Ok(transaction.signatures[0]) + } + + /// Retry a sending a signed Transaction to the server for processing. + pub fn retry_transfer( + &mut self, + keypair: &Keypair, + transaction: &mut Transaction, + tries: usize, + ) -> io::Result { + for x in 0..tries { + transaction.sign(&[keypair], self.get_recent_blockhash()); + let mut buf = vec![0; transaction.serialized_size().unwrap() as usize]; + let mut wr = std::io::Cursor::new(&mut buf[..]); + serialize_into(&mut wr, &transaction) + .expect("serialize Transaction in pub fn transfer_signed"); + self.transactions_socket + .send_to(&buf[..], &self.transactions_addr)?; + if self.poll_for_signature(&transaction.signatures[0]).is_ok() { + return Ok(transaction.signatures[0]); + } + info!("{} tries failed transfer to {}", x, self.transactions_addr); + } + Err(io::Error::new( + io::ErrorKind::Other, + "retry_transfer failed", + )) + } + + /// Creates, signs, and processes a Transaction. Useful for writing unit-tests. + pub fn transfer( + &self, + lamports: u64, + keypair: &Keypair, + to: &Pubkey, + blockhash: &Hash, + ) -> io::Result { + debug!( + "transfer: lamports={} from={:?} to={:?} blockhash={:?}", + lamports, + keypair.pubkey(), + to, + blockhash + ); + let now = Instant::now(); + let transaction = SystemTransaction::new_account(keypair, to, lamports, *blockhash, 0); + let result = self.transfer_signed(&transaction); + solana_metrics::submit( + influxdb::Point::new("thinclient") + .add_tag("op", influxdb::Value::String("transfer".to_string())) + .add_field( + "duration_ms", + influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64), + ) + .to_owned(), + ); + result + } + + pub fn get_account_userdata(&mut self, pubkey: &Pubkey) -> io::Result>> { + let params = json!([format!("{}", pubkey)]); + let response = + self.rpc_client + .make_rpc_request(1, RpcRequest::GetAccountInfo, Some(params)); + match response { + Ok(account_json) => { + let account: Account = + serde_json::from_value(account_json).expect("deserialize account"); + Ok(Some(account.userdata)) + } + Err(error) => { + debug!("get_account_userdata failed: {:?}", error); + Err(io::Error::new( + io::ErrorKind::Other, + "get_account_userdata failed", + )) + } + } + } + + /// Request the balance of the user holding `pubkey`. This method blocks + /// until the server sends a response. If the response packet is dropped + /// by the network, this method will hang indefinitely. + pub fn get_balance(&mut self, pubkey: &Pubkey) -> io::Result { + trace!("get_balance sending request to {}", self.rpc_addr); + let params = json!([format!("{}", pubkey)]); + let response = + self.rpc_client + .make_rpc_request(1, RpcRequest::GetAccountInfo, Some(params)); + + response + .and_then(|account_json| { + let account: Account = + serde_json::from_value(account_json).expect("deserialize account"); + trace!("Response account {:?} {:?}", pubkey, account); + trace!("get_balance {:?}", account.lamports); + Ok(account.lamports) + }) + .map_err(|error| { + debug!("Response account {}: None (error: {:?})", pubkey, error); + io::Error::new(io::ErrorKind::Other, "AccountNotFound") + }) + } + + /// Request the transaction count. If the response packet is dropped by the network, + /// this method will try again 5 times. + pub fn transaction_count(&mut self) -> u64 { + debug!("transaction_count"); + for _tries in 0..5 { + let response = + self.rpc_client + .make_rpc_request(1, RpcRequest::GetTransactionCount, None); + + match response { + Ok(value) => { + debug!("transaction_count response: {:?}", value); + let transaction_count = value.as_u64().unwrap(); + return transaction_count; + } + Err(error) => { + debug!("transaction_count failed: {:?}", error); + } + }; + } + 0 + } + + /// Request the last Entry ID from the server without blocking. + /// Returns the blockhash Hash or None if there was no response from the server. + pub fn try_get_recent_blockhash(&mut self, mut num_retries: u64) -> Option { + loop { + trace!("try_get_recent_blockhash send_to {}", &self.rpc_addr); + let response = + self.rpc_client + .make_rpc_request(1, RpcRequest::GetRecentBlockhash, None); + + match response { + Ok(value) => { + let blockhash_str = value.as_str().unwrap(); + let blockhash_vec = bs58::decode(blockhash_str).into_vec().unwrap(); + return Some(Hash::new(&blockhash_vec)); + } + Err(error) => { + debug!("thin_client get_recent_blockhash error: {:?}", error); + num_retries -= 1; + if num_retries == 0 { + return None; + } + } + } + } + } + + /// Request the last Entry ID from the server. This method blocks + /// until the server sends a response. + pub fn get_recent_blockhash(&mut self) -> Hash { + loop { + trace!("get_recent_blockhash send_to {}", &self.rpc_addr); + if let Some(hash) = self.try_get_recent_blockhash(10) { + return hash; + } + } + } + + /// Request a new last Entry ID from the server. This method blocks + /// until the server sends a response. + pub fn get_next_blockhash(&mut self, previous_blockhash: &Hash) -> Hash { + self.get_next_blockhash_ext(previous_blockhash, &|| { + sleep(Duration::from_millis(100)); + }) + } + pub fn get_next_blockhash_ext(&mut self, previous_blockhash: &Hash, func: &Fn()) -> Hash { + loop { + let blockhash = self.get_recent_blockhash(); + if blockhash != *previous_blockhash { + break blockhash; + } + debug!("Got same blockhash ({:?}), will retry...", blockhash); + func() + } + } + + pub fn submit_poll_balance_metrics(elapsed: &Duration) { + solana_metrics::submit( + influxdb::Point::new("thinclient") + .add_tag("op", influxdb::Value::String("get_balance".to_string())) + .add_field( + "duration_ms", + influxdb::Value::Integer(timing::duration_as_ms(elapsed) as i64), + ) + .to_owned(), + ); + } + + pub fn poll_balance_with_timeout( + &mut self, + pubkey: &Pubkey, + polling_frequency: &Duration, + timeout: &Duration, + ) -> io::Result { + let now = Instant::now(); + loop { + match self.get_balance(&pubkey) { + Ok(bal) => { + ThinClient::submit_poll_balance_metrics(&now.elapsed()); + return Ok(bal); + } + Err(e) => { + sleep(*polling_frequency); + if now.elapsed() > *timeout { + ThinClient::submit_poll_balance_metrics(&now.elapsed()); + return Err(e); + } + } + }; + } + } + + pub fn poll_get_balance(&mut self, pubkey: &Pubkey) -> io::Result { + self.poll_balance_with_timeout(pubkey, &Duration::from_millis(100), &Duration::from_secs(1)) + } + + /// Poll the server to confirm a transaction. + pub fn poll_for_signature(&mut self, signature: &Signature) -> io::Result<()> { + let now = Instant::now(); + while !self.check_signature(signature) { + if now.elapsed().as_secs() > 15 { + // TODO: Return a better error. + return Err(io::Error::new(io::ErrorKind::Other, "signature not found")); + } + sleep(Duration::from_millis(250)); + } + Ok(()) + } + + /// Check a signature in the bank. This method blocks + /// until the server sends a response. + pub fn check_signature(&mut self, signature: &Signature) -> bool { + trace!("check_signature: {:?}", signature); + let params = json!([format!("{}", signature)]); + let now = Instant::now(); + + loop { + let response = self.rpc_client.make_rpc_request( + 1, + RpcRequest::ConfirmTransaction, + Some(params.clone()), + ); + + match response { + Ok(confirmation) => { + let signature_status = confirmation.as_bool().unwrap(); + if signature_status { + trace!("Response found signature"); + } else { + trace!("Response signature not found"); + } + solana_metrics::submit( + influxdb::Point::new("thinclient") + .add_tag("op", influxdb::Value::String("check_signature".to_string())) + .add_field( + "duration_ms", + influxdb::Value::Integer( + timing::duration_as_ms(&now.elapsed()) as i64 + ), + ) + .to_owned(), + ); + return signature_status; + } + Err(err) => { + debug!("check_signature request failed: {:?}", err); + } + }; + } + } + pub fn fullnode_exit(&mut self) -> io::Result { + trace!("fullnode_exit sending request to {}", self.rpc_addr); + let response = self + .rpc_client + .make_rpc_request(1, RpcRequest::FullnodeExit, None) + .map_err(|error| { + debug!("Response from {} fullndoe_exit: {}", self.rpc_addr, error); + io::Error::new(io::ErrorKind::Other, "FullodeExit request failure") + })?; + serde_json::from_value(response).map_err(|error| { + debug!( + "ParseError: from {} fullndoe_exit: {}", + self.rpc_addr, error + ); + io::Error::new(io::ErrorKind::Other, "FullodeExit parse failure") + }) + } +} + +impl Drop for ThinClient { + fn drop(&mut self) { + solana_metrics::flush(); + } +} + +pub fn retry_get_balance( + client: &mut ThinClient, + bob_pubkey: &Pubkey, + expected_balance: Option, +) -> Option { + const LAST: usize = 30; + for run in 0..LAST { + let balance_result = client.poll_get_balance(bob_pubkey); + if expected_balance.is_none() { + return balance_result.ok(); + } + trace!( + "retry_get_balance[{}] {:?} {:?}", + run, + balance_result, + expected_balance + ); + if let (Some(expected_balance), Ok(balance_result)) = (expected_balance, balance_result) { + if expected_balance == balance_result { + return Some(balance_result); + } + } + } + None +}