diff --git a/src/accountant.rs b/src/accountant.rs index 7409a6fb54..857dca6063 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -17,7 +17,7 @@ use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet, VecDeque}; use std::result; use std::sync::RwLock; -use std::sync::atomic::{AtomicIsize, Ordering}; +use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering}; use transaction::Transaction; pub const MAX_ENTRY_IDS: usize = 1024 * 4; @@ -59,6 +59,7 @@ pub struct Accountant { last_ids: RwLock>)>>, time_sources: RwLock>, last_time: RwLock>, + transaction_count: AtomicUsize, } impl Accountant { @@ -72,6 +73,7 @@ impl Accountant { last_ids: RwLock::new(VecDeque::new()), time_sources: RwLock::new(HashSet::new()), last_time: RwLock::new(Utc.timestamp(0, 0)), + transaction_count: AtomicUsize::new(0), } } @@ -188,7 +190,10 @@ impl Accountant { ); match result { - Ok(_) => return Ok(()), + Ok(_) => { + self.transaction_count.fetch_add(1, Ordering::Relaxed); + return Ok(()); + } Err(_) => continue, }; } @@ -387,6 +392,10 @@ impl Accountant { .expect("'balances' read lock in get_balance"); bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64) } + + pub fn transaction_count(&self) -> usize { + self.transaction_count.load(Ordering::Relaxed) + } } #[cfg(test)] @@ -412,6 +421,7 @@ mod tests { .transfer(500, &alice.keypair(), bob_pubkey, alice.last_id()) .unwrap(); assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_500); + assert_eq!(accountant.transaction_count(), 2); } #[test] @@ -422,6 +432,7 @@ mod tests { accountant.transfer(1, &KeyPair::new(), mint.pubkey(), mint.last_id()), Err(AccountingError::AccountNotFound) ); + assert_eq!(accountant.transaction_count(), 0); } #[test] @@ -432,10 +443,12 @@ mod tests { accountant .transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) .unwrap(); + assert_eq!(accountant.transaction_count(), 1); assert_eq!( accountant.transfer(10_001, &alice.keypair(), bob_pubkey, alice.last_id()), Err(AccountingError::InsufficientFunds) ); + assert_eq!(accountant.transaction_count(), 1); let alice_pubkey = alice.keypair().pubkey(); assert_eq!(accountant.get_balance(&alice_pubkey).unwrap(), 10_000); @@ -468,6 +481,9 @@ mod tests { // Alice's balance will be zero because all funds are locked up. assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0)); + // tx count is 1, because debits were applied. + assert_eq!(accountant.transaction_count(), 1); + // Bob's balance will be None because the funds have not been // sent. assert_eq!(accountant.get_balance(&bob_pubkey), None); @@ -479,6 +495,10 @@ mod tests { .unwrap(); assert_eq!(accountant.get_balance(&bob_pubkey), Some(1)); + // tx count is still 1, because we chose not to count timestamp events + // tx count. + assert_eq!(accountant.transaction_count(), 1); + accountant .process_verified_timestamp(alice.pubkey(), dt) .unwrap(); // <-- Attack! Attempt to process completed transaction. @@ -516,6 +536,9 @@ mod tests { .transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id()) .unwrap(); + // Assert the debit counts as a transaction. + assert_eq!(accountant.transaction_count(), 1); + // Alice's balance will be zero because all funds are locked up. assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0)); @@ -530,6 +553,9 @@ mod tests { assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1)); assert_eq!(accountant.get_balance(&bob_pubkey), None); + // Assert cancel doesn't cause count to go backward. + assert_eq!(accountant.transaction_count(), 1); + accountant .process_verified_sig(alice.pubkey(), sig) .unwrap(); // <-- Attack! Attempt to cancel completed transaction. @@ -576,7 +602,11 @@ mod tests { let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); let trs = vec![tr0, tr1]; - assert!(accountant.process_verified_transactions(trs)[1].is_err()); + let results = accountant.process_verified_transactions(trs); + assert!(results[1].is_err()); + + // Assert bad transactions aren't counted. + assert_eq!(accountant.transaction_count(), 1); } } diff --git a/src/entry_writer.rs b/src/entry_writer.rs index f50a7d313b..ab973ba3bc 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -4,7 +4,6 @@ use accountant::Accountant; use entry::Entry; use ledger; use packet; -use request_processor::RequestProcessor; use result::Result; use serde_json; use std::collections::VecDeque; @@ -17,16 +16,12 @@ use streamer; pub struct EntryWriter<'a> { accountant: &'a Accountant, - request_processor: &'a RequestProcessor, } impl<'a> EntryWriter<'a> { /// Create a new Tpu that wraps the given Accountant. - pub fn new(accountant: &'a Accountant, request_processor: &'a RequestProcessor) -> Self { - EntryWriter { - accountant, - request_processor, - } + pub fn new(accountant: &'a Accountant) -> Self { + EntryWriter { accountant } } fn write_entry(&self, writer: &Mutex, entry: &Entry) { @@ -37,7 +32,6 @@ impl<'a> EntryWriter<'a> { "{}", serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry") ).expect("writeln! in fn write_entry"); - self.request_processor.notify_entry_info_subscribers(&entry); } fn write_entries( diff --git a/src/request.rs b/src/request.rs index 7575010ad8..d1c692ae42 100644 --- a/src/request.rs +++ b/src/request.rs @@ -12,19 +12,8 @@ use transaction::Transaction; pub enum Request { Transaction(Transaction), GetBalance { key: PublicKey }, - Subscribe { subscriptions: Vec }, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Subscription { - EntryInfo, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct EntryInfo { - pub id: Hash, - pub num_hashes: u64, - pub num_events: u64, + GetLastId, + GetTransactionCount, } impl Request { @@ -40,7 +29,8 @@ impl Request { #[derive(Serialize, Deserialize, Debug)] pub enum Response { Balance { key: PublicKey, val: Option }, - EntryInfo(EntryInfo), + LastId { id: Hash }, + TransactionCount { transaction_count: u64 }, } pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { diff --git a/src/request_processor.rs b/src/request_processor.rs index 6f2f3bd7c9..4e6c815edf 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -8,12 +8,12 @@ use event_processor::EventProcessor; use packet; use packet::SharedPackets; use rayon::prelude::*; -use request::{EntryInfo, Request, Response, Subscription}; +use request::{Request, Response}; use result::Result; use std::collections::VecDeque; -use std::net::{SocketAddr, UdpSocket}; +use std::net::SocketAddr; +use std::sync::Arc; use std::sync::mpsc::{Receiver, Sender}; -use std::sync::{Arc, Mutex}; use std::time::Duration; use std::time::Instant; use streamer; @@ -21,16 +21,12 @@ use timing; pub struct RequestProcessor { accountant: Arc, - entry_info_subscribers: Mutex>, } impl RequestProcessor { /// Create a new Tpu that wraps the given Accountant. pub fn new(accountant: Arc) -> Self { - RequestProcessor { - accountant, - entry_info_subscribers: Mutex::new(vec![]), - } + RequestProcessor { accountant } } /// Process Request items sent by clients. @@ -46,17 +42,19 @@ impl RequestProcessor { info!("Response::Balance {:?}", rsp); Some(rsp) } - Request::Transaction(_) => unreachable!(), - Request::Subscribe { subscriptions } => { - for subscription in subscriptions { - match subscription { - Subscription::EntryInfo => { - self.entry_info_subscribers.lock().unwrap().push(rsp_addr) - } - } - } - None + Request::GetLastId => { + let id = self.accountant.last_id(); + let rsp = (Response::LastId { id }, rsp_addr); + info!("Response::LastId {:?}", rsp); + Some(rsp) } + Request::GetTransactionCount => { + let transaction_count = self.accountant.transaction_count() as u64; + let rsp = (Response::TransactionCount { transaction_count }, rsp_addr); + info!("Response::TransactionCount {:?}", rsp); + Some(rsp) + } + Request::Transaction(_) => unreachable!(), } } @@ -69,29 +67,6 @@ impl RequestProcessor { .collect() } - pub fn notify_entry_info_subscribers(&self, entry: &Entry) { - // TODO: No need to bind(). - let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); - - // copy subscribers to avoid taking lock while doing io - let addrs = self.entry_info_subscribers.lock().unwrap().clone(); - trace!("Sending to {} addrs", addrs.len()); - for addr in addrs { - let entry_info = EntryInfo { - id: entry.id, - num_hashes: entry.num_hashes, - num_events: entry.events.len() as u64, - }; - let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); - trace!("sending {} to {}", data.len(), addr); - //TODO dont do IO here, this needs to be on a separate channel - let res = socket.send_to(&data, addr); - if res.is_err() { - eprintln!("couldn't send response: {:?}", res); - } - } - } - fn deserialize_requests(p: &packet::Packets) -> Vec> { p.packets .par_iter() diff --git a/src/rpu.rs b/src/rpu.rs index 2cea96fab3..b8954c5542 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -33,7 +33,6 @@ impl Rpu { fn write_service( accountant: Arc, - request_processor: Arc, exit: Arc, broadcast: streamer::BlobSender, blob_recycler: packet::BlobRecycler, @@ -41,7 +40,7 @@ impl Rpu { entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || loop { - let entry_writer = EntryWriter::new(&accountant, &request_processor); + let entry_writer = EntryWriter::new(&accountant); let _ = entry_writer.write_and_send_entries( &broadcast, &blob_recycler, @@ -99,7 +98,6 @@ impl Rpu { let (broadcast_sender, broadcast_receiver) = channel(); let t_write = Self::write_service( self.event_processor.accountant.clone(), - request_stage.request_processor.clone(), exit.clone(), broadcast_sender, blob_recycler.clone(), diff --git a/src/thin_client.rs b/src/thin_client.rs index 230b2d729e..2bc2908432 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -6,7 +6,7 @@ use bincode::{deserialize, serialize}; use futures::future::{ok, FutureResult}; use hash::Hash; -use request::{Request, Response, Subscription}; +use request::{Request, Response}; use signature::{KeyPair, PublicKey, Signature}; use std::collections::HashMap; use std::io; @@ -18,7 +18,7 @@ pub struct ThinClient { pub requests_socket: UdpSocket, pub events_socket: UdpSocket, last_id: Option, - num_events: u64, + transaction_count: u64, balances: HashMap>, } @@ -32,21 +32,12 @@ impl ThinClient { requests_socket, events_socket, last_id: None, - num_events: 0, + transaction_count: 0, balances: HashMap::new(), }; - client.init(); client } - pub fn init(&self) { - let subscriptions = vec![Subscription::EntryInfo]; - let req = Request::Subscribe { subscriptions }; - let data = serialize(&req).expect("serialize Subscribe in thin_client"); - trace!("subscribing to {}", self.addr); - let _res = self.requests_socket.send_to(&data, &self.addr); - } - pub fn recv_response(&self) -> io::Result { let mut buf = vec![0u8; 1024]; info!("start recv_from"); @@ -62,10 +53,13 @@ impl ThinClient { info!("Response balance {:?} {:?}", key, val); self.balances.insert(key, val); } - Response::EntryInfo(entry_info) => { - trace!("Response entry_info {:?}", entry_info.id); - self.last_id = Some(entry_info.id); - self.num_events += entry_info.num_events; + Response::LastId { id } => { + info!("Response last_id {:?}", id); + self.last_id = Some(id); + } + Response::TransactionCount { transaction_count } => { + info!("Response transaction count {:?}", transaction_count); + self.transaction_count = transaction_count; } } } @@ -113,41 +107,47 @@ impl ThinClient { self.balances[pubkey].ok_or(io::Error::new(io::ErrorKind::Other, "nokey")) } - /// Request the last Entry ID from the server. This method blocks - /// until the server sends a response. - pub fn get_last_id(&mut self) -> FutureResult { - self.transaction_count(); - ok(self.last_id.unwrap_or(Hash::default())) - } - - /// Return the number of transactions the server processed since creating - /// this client instance. + /// Request the transaction count. If the response packet is dropped by the network, + /// this method will hang. pub fn transaction_count(&mut self) -> u64 { - // Wait for at least one EntryInfo. + info!("transaction_count"); + let req = Request::GetTransactionCount; + let data = + serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count"); + self.requests_socket + .send_to(&data, &self.addr) + .expect("buffer error in pub fn transaction_count"); let mut done = false; while !done { - let resp = self.recv_response() - .expect("recv_response in pub fn transaction_count"); - if let &Response::EntryInfo(_) = &resp { + let resp = self.recv_response().expect("transaction count dropped"); + info!("recv_response {:?}", resp); + if let &Response::TransactionCount { .. } = &resp { done = true; } self.process_response(resp); } + self.transaction_count + } - // Then take the rest. + /// Request the last Entry ID from the server. This method blocks + /// until the server sends a response. + pub fn get_last_id(&mut self) -> FutureResult { + info!("get_last_id"); + let req = Request::GetLastId; + let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id"); self.requests_socket - .set_nonblocking(true) - .expect("set_nonblocking in pub fn transaction_count"); - loop { - match self.recv_response() { - Err(_) => break, - Ok(resp) => self.process_response(resp), + .send_to(&data, &self.addr) + .expect("buffer error in pub fn get_last_id"); + let mut done = false; + while !done { + let resp = self.recv_response().expect("get_last_id response"); + info!("recv_response {:?}", resp); + if let &Response::LastId { .. } = &resp { + done = true; } + self.process_response(resp); } - self.requests_socket - .set_nonblocking(false) - .expect("set_nonblocking in pub fn transaction_count"); - self.num_events + ok(self.last_id.expect("some last_id")) } } diff --git a/src/tvu.rs b/src/tvu.rs index d486a5888a..8deedbf23c 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -34,12 +34,11 @@ impl Tvu { fn drain_service( accountant: Arc, - request_processor: Arc, exit: Arc, entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || { - let entry_writer = EntryWriter::new(&accountant, &request_processor); + let entry_writer = EntryWriter::new(&accountant); loop { let _ = entry_writer.drain_entries(&entry_receiver); if exit.load(Ordering::Relaxed) { @@ -183,7 +182,6 @@ impl Tvu { let t_write = Self::drain_service( obj.event_processor.accountant.clone(), - request_stage.request_processor.clone(), exit.clone(), request_stage.entry_receiver, );