//! The `thin_client` module is a client-side object that interfaces with //! a server-side TPU. Client code should use this object instead of writing //! messages to the network directly. The binary encoding of its messages are //! unstable and may change in future releases. use account::Account; use bank::Bank; use bincode::{deserialize, serialize}; use crdt::{Crdt, CrdtError, NodeInfo}; use hash::Hash; use log::Level; use ncp::Ncp; use pubkey::Pubkey; use request::{Request, Response}; use result::{Error, Result}; use signature::{Keypair, Signature}; use std; use std::collections::HashMap; use std::io; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; use std::time::Instant; use system_transaction::SystemTransaction; use timing; use transaction::Transaction; use influx_db_client as influxdb; use metrics; /// An object for querying and sending transactions to the network. pub struct ThinClient { requests_addr: SocketAddr, requests_socket: UdpSocket, transactions_addr: SocketAddr, transactions_socket: UdpSocket, last_id: Option, transaction_count: u64, balances: HashMap, signature_status: bool, finality: Option, } impl ThinClient { /// Create a new ThinClient that will interface with Rpu /// over `requests_socket` and `transactions_socket`. To receive responses, the caller must bind `socket` /// to a public address before invoking ThinClient methods. pub fn new( requests_addr: SocketAddr, requests_socket: UdpSocket, transactions_addr: SocketAddr, transactions_socket: UdpSocket, ) -> Self { ThinClient { requests_addr, requests_socket, transactions_addr, transactions_socket, last_id: None, transaction_count: 0, balances: HashMap::new(), signature_status: false, finality: None, } } pub fn recv_response(&self) -> io::Result { let mut buf = vec![0u8; 1024]; trace!("start recv_from"); match self.requests_socket.recv_from(&mut buf) { Ok((len, from)) => { trace!("end recv_from got {} {}", len, from); deserialize(&buf) .or_else(|_| Err(io::Error::new(io::ErrorKind::Other, "deserialize"))) } Err(e) => { trace!("end recv_from got {:?}", e); Err(e) } } } pub fn process_response(&mut self, resp: &Response) { match *resp { Response::Account { key, account: Some(ref account), } => { trace!("Response account {:?} {:?}", key, account); self.balances.insert(key, account.clone()); } Response::Account { key, account: None } => { debug!("Response account {}: None ", key); self.balances.remove(&key); } Response::LastId { id } => { trace!("Response last_id {:?}", id); self.last_id = Some(id); } Response::TransactionCount { transaction_count } => { trace!("Response transaction count {:?}", transaction_count); self.transaction_count = transaction_count; } Response::SignatureStatus { signature_status } => { self.signature_status = signature_status; if signature_status { trace!("Response found signature"); } else { trace!("Response signature not found"); } } Response::Finality { time } => { trace!("Response finality {:?}", time); self.finality = Some(time); } } } /// Send a signed Transaction to the server for processing. This method /// does not wait for a response. pub fn transfer_signed(&self, tx: &Transaction) -> io::Result { let data = serialize(&tx).expect("serialize Transaction in pub fn transfer_signed"); self.transactions_socket .send_to(&data, &self.transactions_addr)?; Ok(tx.signature) } /// Retry a sending a signed Transaction to the server for processing. pub fn retry_transfer_signed( &mut self, tx: &Transaction, tries: usize, ) -> io::Result { let data = serialize(&tx).expect("serialize Transaction in pub fn transfer_signed"); for x in 0..tries { self.transactions_socket .send_to(&data, &self.transactions_addr)?; if self.poll_for_signature(&tx.signature).is_ok() { return Ok(tx.signature); } info!("{} tries failed transfer to {}", x, self.transactions_addr); } Err(io::Error::new( io::ErrorKind::Other, "retry_transfer_signed failed", )) } /// Creates, signs, and processes a Transaction. Useful for writing unit-tests. pub fn transfer( &self, n: i64, keypair: &Keypair, to: Pubkey, last_id: &Hash, ) -> io::Result { let now = Instant::now(); let tx = Transaction::system_new(keypair, to, n, *last_id); let result = self.transfer_signed(&tx); metrics::submit( influxdb::Point::new("thinclient") .add_tag("op", influxdb::Value::String("transfer".to_string())) .add_field( "duration_ms", influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64), ).to_owned(), ); result } /// Request the balance of the user holding `pubkey`. This method blocks /// until the server sends a response. If the response packet is dropped /// by the network, this method will hang indefinitely. pub fn get_balance(&mut self, pubkey: &Pubkey) -> io::Result { trace!("get_balance sending request to {}", self.requests_addr); let req = Request::GetAccount { key: *pubkey }; let data = serialize(&req).expect("serialize GetAccount in pub fn get_balance"); self.requests_socket .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn get_balance"); let mut done = false; while !done { let resp = self.recv_response()?; trace!("recv_response {:?}", resp); if let Response::Account { key, .. } = &resp { done = key == pubkey; } self.process_response(&resp); } trace!("get_balance {:?}", self.balances.get(pubkey)); // TODO: This is a hard coded call to introspect the balance of a budget_dsl contract // In the future custom contracts would need their own introspection self.balances .get(pubkey) .map(Bank::read_balance) .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "AccountNotFound")) } /// Request the finality from the leader node pub fn get_finality(&mut self) -> usize { trace!("get_finality"); let req = Request::GetFinality; let data = serialize(&req).expect("serialize GetFinality in pub fn get_finality"); let mut done = false; while !done { debug!("get_finality send_to {}", &self.requests_addr); self.requests_socket .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn get_finality"); match self.recv_response() { Ok(resp) => { if let Response::Finality { .. } = resp { done = true; } self.process_response(&resp); } Err(e) => { debug!("thin_client get_finality error: {}", e); } } } self.finality.expect("some finality") } /// Request the transaction count. If the response packet is dropped by the network, /// this method will try again 5 times. pub fn transaction_count(&mut self) -> u64 { debug!("transaction_count"); let req = Request::GetTransactionCount; let data = serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count"); let mut tries_left = 5; while tries_left > 0 { self.requests_socket .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn transaction_count"); if let Ok(resp) = self.recv_response() { debug!("transaction_count recv_response: {:?}", resp); if let Response::TransactionCount { .. } = resp { tries_left = 0; } self.process_response(&resp); } else { tries_left -= 1; } } self.transaction_count } /// Request the last Entry ID from the server. This method blocks /// until the server sends a response. pub fn get_last_id(&mut self) -> Hash { trace!("get_last_id"); let req = Request::GetLastId; let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id"); let mut done = false; while !done { debug!("get_last_id send_to {}", &self.requests_addr); self.requests_socket .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn get_last_id"); match self.recv_response() { Ok(resp) => { if let Response::LastId { .. } = resp { done = true; } self.process_response(&resp); } Err(e) => { debug!("thin_client get_last_id error: {}", e); } } } self.last_id.expect("some last_id") } pub fn submit_poll_balance_metrics(elapsed: &Duration) { metrics::submit( influxdb::Point::new("thinclient") .add_tag("op", influxdb::Value::String("get_balance".to_string())) .add_field( "duration_ms", influxdb::Value::Integer(timing::duration_as_ms(elapsed) as i64), ).to_owned(), ); } pub fn poll_balance_with_timeout( &mut self, pubkey: &Pubkey, polling_frequency: &Duration, timeout: &Duration, ) -> io::Result { let now = Instant::now(); loop { match self.get_balance(&pubkey) { Ok(bal) => { ThinClient::submit_poll_balance_metrics(&now.elapsed()); return Ok(bal); } Err(e) => { sleep(*polling_frequency); if now.elapsed() > *timeout { ThinClient::submit_poll_balance_metrics(&now.elapsed()); return Err(e); } } }; } } pub fn poll_get_balance(&mut self, pubkey: &Pubkey) -> io::Result { self.poll_balance_with_timeout(pubkey, &Duration::from_millis(100), &Duration::from_secs(1)) } /// Poll the server to confirm a transaction. pub fn poll_for_signature(&mut self, signature: &Signature) -> io::Result<()> { let now = Instant::now(); while !self.check_signature(signature) { if now.elapsed().as_secs() > 1 { // TODO: Return a better error. return Err(io::Error::new(io::ErrorKind::Other, "signature not found")); } sleep(Duration::from_millis(100)); } Ok(()) } /// Check a signature in the bank. This method blocks /// until the server sends a response. pub fn check_signature(&mut self, signature: &Signature) -> bool { trace!("check_signature"); let req = Request::GetSignature { signature: *signature, }; let data = serialize(&req).expect("serialize GetSignature in pub fn check_signature"); let now = Instant::now(); let mut done = false; while !done { self.requests_socket .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn get_last_id"); if let Ok(resp) = self.recv_response() { if let Response::SignatureStatus { .. } = resp { done = true; } self.process_response(&resp); } } metrics::submit( influxdb::Point::new("thinclient") .add_tag("op", influxdb::Value::String("check_signature".to_string())) .add_field( "duration_ms", influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64), ).to_owned(), ); self.signature_status } } impl Drop for ThinClient { fn drop(&mut self) { metrics::flush(); } } pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> Result { let exit = Arc::new(AtomicBool::new(false)); let (node, gossip_socket) = Crdt::spy_node(); let my_addr = gossip_socket.local_addr().unwrap(); let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new"))); let window = Arc::new(RwLock::new(vec![])); let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone()); let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp); crdt.write().unwrap().insert(&leader_entry_point); sleep(Duration::from_millis(100)); let deadline = match timeout { Some(timeout) => Duration::new(timeout, 0), None => Duration::new(std::u64::MAX, 0), }; let now = Instant::now(); // Block until leader's correct contact info is received let leader; loop { trace!("polling {:?} for leader from {:?}", leader_ncp, my_addr); if let Some(l) = crdt.read().unwrap().leader_data() { leader = Some(l.clone()); break; } if log_enabled!(Level::Trace) { trace!("{}", crdt.read().unwrap().node_info_trace()); } if now.elapsed() > deadline { return Err(Error::CrdtError(CrdtError::NoLeader)); } sleep(Duration::from_millis(100)); } ncp.close()?; if log_enabled!(Level::Trace) { trace!("{}", crdt.read().unwrap().node_info_trace()); } Ok(leader.unwrap().clone()) } #[cfg(test)] mod tests { use super::*; use bank::Bank; use crdt::Node; use fullnode::Fullnode; use ledger::LedgerWriter; use logger; use mint::Mint; use signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; use system_program::SystemProgram; fn tmp_ledger(name: &str, mint: &Mint) -> String { use std::env; let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); let keypair = Keypair::new(); let path = format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey()); let mut writer = LedgerWriter::open(&path, true).unwrap(); writer.write_entries(mint.create_entries()).unwrap(); path } #[test] fn test_thin_client() { logger::setup(); let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let ledger_path = tmp_ledger("thin_client", &alice); let server = Fullnode::new_with_bank( leader_keypair, bank, 0, &[], leader, None, &ledger_path, false, None, Some(0), ); sleep(Duration::from_millis(900)); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( leader_data.contact_info.rpu, requests_socket, leader_data.contact_info.tpu, transactions_socket, ); let last_id = client.get_last_id(); let signature = client .transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); client.poll_for_signature(&signature).unwrap(); let balance = client.get_balance(&bob_pubkey); assert_eq!(balance.unwrap(), 500); server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } // sleep(Duration::from_millis(300)); is unstable #[test] #[ignore] fn test_bad_sig() { logger::setup(); let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let leader_data = leader.info.clone(); let ledger_path = tmp_ledger("bad_sig", &alice); let server = Fullnode::new_with_bank( leader_keypair, bank, 0, &[], leader, None, &ledger_path, false, None, Some(0), ); //TODO: remove this sleep, or add a retry so CI is stable sleep(Duration::from_millis(300)); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); requests_socket .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( leader_data.contact_info.rpu, requests_socket, leader_data.contact_info.tpu, transactions_socket, ); let last_id = client.get_last_id(); let tx = Transaction::system_new(&alice.keypair(), bob_pubkey, 500, last_id); let _sig = client.transfer_signed(&tx).unwrap(); let last_id = client.get_last_id(); let mut tr2 = Transaction::system_new(&alice.keypair(), bob_pubkey, 501, last_id); let mut instruction2 = deserialize(&tr2.userdata).unwrap(); if let SystemProgram::Move { ref mut tokens } = instruction2 { *tokens = 502; } tr2.userdata = serialize(&instruction2).unwrap(); let signature = client.transfer_signed(&tr2).unwrap(); client.poll_for_signature(&signature).unwrap(); let balance = client.get_balance(&bob_pubkey); assert_eq!(balance.unwrap(), 500); server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } #[test] fn test_client_check_signature() { logger::setup(); let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let leader_data = leader.info.clone(); let ledger_path = tmp_ledger("client_check_signature", &alice); let server = Fullnode::new_with_bank( leader_keypair, bank, 0, &[], leader, None, &ledger_path, false, None, Some(0), ); sleep(Duration::from_millis(300)); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); requests_socket .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( leader_data.contact_info.rpu, requests_socket, leader_data.contact_info.tpu, transactions_socket, ); let last_id = client.get_last_id(); let signature = client .transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); assert!(client.poll_for_signature(&signature).is_ok()); server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } #[test] fn test_transaction_count() { // set a bogus address, see that we don't hang logger::setup(); let addr = "0.0.0.0:1234".parse().unwrap(); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); requests_socket .set_read_timeout(Some(Duration::from_millis(250))) .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new(addr, requests_socket, addr, transactions_socket); assert_eq!(client.transaction_count(), 0); } #[test] fn test_zero_balance_after_nonzero() { logger::setup(); let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_keypair = Keypair::new(); let leader_data = leader.info.clone(); let ledger_path = tmp_ledger("zero_balance_check", &alice); let server = Fullnode::new_with_bank( leader_keypair, bank, 0, &[], leader, None, &ledger_path, false, None, Some(0), ); sleep(Duration::from_millis(900)); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); requests_socket .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( leader_data.contact_info.rpu, requests_socket, leader_data.contact_info.tpu, transactions_socket, ); let last_id = client.get_last_id(); // give bob 500 tokens let signature = client .transfer(500, &alice.keypair(), bob_keypair.pubkey(), &last_id) .unwrap(); assert!(client.poll_for_signature(&signature).is_ok()); let balance = client.poll_get_balance(&bob_keypair.pubkey()); assert!(balance.is_ok()); assert_eq!(balance.unwrap(), 500); // take them away let signature = client .transfer(500, &bob_keypair, alice.keypair().pubkey(), &last_id) .unwrap(); assert!(client.poll_for_signature(&signature).is_ok()); // should get an error when bob's account is purged let balance = client.poll_get_balance(&bob_keypair.pubkey()); assert!(balance.is_err()); server .close() .unwrap_or_else(|e| panic!("close() failed! {:?}", e)); remove_dir_all(ledger_path).unwrap(); } }