From 317031f4555ab2cce1c5cbfa9bf609fdd47e9426 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 06:49:48 -0600 Subject: [PATCH 1/7] Add transaction count to accountant --- src/accountant.rs | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index 7409a6fb54..857dca6063 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -17,7 +17,7 @@ use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet, VecDeque}; use std::result; use std::sync::RwLock; -use std::sync::atomic::{AtomicIsize, Ordering}; +use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering}; use transaction::Transaction; pub const MAX_ENTRY_IDS: usize = 1024 * 4; @@ -59,6 +59,7 @@ pub struct Accountant { last_ids: RwLock>)>>, time_sources: RwLock>, last_time: RwLock>, + transaction_count: AtomicUsize, } impl Accountant { @@ -72,6 +73,7 @@ impl Accountant { last_ids: RwLock::new(VecDeque::new()), time_sources: RwLock::new(HashSet::new()), last_time: RwLock::new(Utc.timestamp(0, 0)), + transaction_count: AtomicUsize::new(0), } } @@ -188,7 +190,10 @@ impl Accountant { ); match result { - Ok(_) => return Ok(()), + Ok(_) => { + self.transaction_count.fetch_add(1, Ordering::Relaxed); + return Ok(()); + } Err(_) => continue, }; } @@ -387,6 +392,10 @@ impl Accountant { .expect("'balances' read lock in get_balance"); bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64) } + + pub fn transaction_count(&self) -> usize { + self.transaction_count.load(Ordering::Relaxed) + } } #[cfg(test)] @@ -412,6 +421,7 @@ mod tests { .transfer(500, &alice.keypair(), bob_pubkey, alice.last_id()) .unwrap(); assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_500); + assert_eq!(accountant.transaction_count(), 2); } #[test] @@ -422,6 +432,7 @@ mod tests { accountant.transfer(1, &KeyPair::new(), mint.pubkey(), mint.last_id()), Err(AccountingError::AccountNotFound) ); + assert_eq!(accountant.transaction_count(), 0); } #[test] @@ -432,10 +443,12 @@ mod tests { accountant .transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) .unwrap(); + assert_eq!(accountant.transaction_count(), 1); assert_eq!( accountant.transfer(10_001, &alice.keypair(), bob_pubkey, alice.last_id()), Err(AccountingError::InsufficientFunds) ); + assert_eq!(accountant.transaction_count(), 1); let alice_pubkey = alice.keypair().pubkey(); assert_eq!(accountant.get_balance(&alice_pubkey).unwrap(), 10_000); @@ -468,6 +481,9 @@ mod tests { // Alice's balance will be zero because all funds are locked up. assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0)); + // tx count is 1, because debits were applied. + assert_eq!(accountant.transaction_count(), 1); + // Bob's balance will be None because the funds have not been // sent. assert_eq!(accountant.get_balance(&bob_pubkey), None); @@ -479,6 +495,10 @@ mod tests { .unwrap(); assert_eq!(accountant.get_balance(&bob_pubkey), Some(1)); + // tx count is still 1, because we chose not to count timestamp events + // tx count. + assert_eq!(accountant.transaction_count(), 1); + accountant .process_verified_timestamp(alice.pubkey(), dt) .unwrap(); // <-- Attack! Attempt to process completed transaction. @@ -516,6 +536,9 @@ mod tests { .transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id()) .unwrap(); + // Assert the debit counts as a transaction. + assert_eq!(accountant.transaction_count(), 1); + // Alice's balance will be zero because all funds are locked up. assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0)); @@ -530,6 +553,9 @@ mod tests { assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1)); assert_eq!(accountant.get_balance(&bob_pubkey), None); + // Assert cancel doesn't cause count to go backward. + assert_eq!(accountant.transaction_count(), 1); + accountant .process_verified_sig(alice.pubkey(), sig) .unwrap(); // <-- Attack! Attempt to cancel completed transaction. @@ -576,7 +602,11 @@ mod tests { let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); let trs = vec![tr0, tr1]; - assert!(accountant.process_verified_transactions(trs)[1].is_err()); + let results = accountant.process_verified_transactions(trs); + assert!(results[1].is_err()); + + // Assert bad transactions aren't counted. + assert_eq!(accountant.transaction_count(), 1); } } From 455050e19c2952b6378f368bf70cd6e7b586b43c Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 07:16:39 -0600 Subject: [PATCH 2/7] Expose the server-side transaction count --- src/request.rs | 2 ++ src/request_processor.rs | 6 ++++++ src/thin_client.rs | 28 ++++++++++++++++++++++++++++ 3 files changed, 36 insertions(+) diff --git a/src/request.rs b/src/request.rs index 7575010ad8..402aafc27b 100644 --- a/src/request.rs +++ b/src/request.rs @@ -12,6 +12,7 @@ use transaction::Transaction; pub enum Request { Transaction(Transaction), GetBalance { key: PublicKey }, + GetTransactionCount, Subscribe { subscriptions: Vec }, } @@ -40,6 +41,7 @@ impl Request { #[derive(Serialize, Deserialize, Debug)] pub enum Response { Balance { key: PublicKey, val: Option }, + TransactionCount { transaction_count: u64 }, EntryInfo(EntryInfo), } diff --git a/src/request_processor.rs b/src/request_processor.rs index 6f2f3bd7c9..ad7f76183b 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -46,6 +46,12 @@ impl RequestProcessor { info!("Response::Balance {:?}", rsp); Some(rsp) } + Request::GetTransactionCount => { + let transaction_count = self.accountant.transaction_count() as u64; + let rsp = (Response::TransactionCount { transaction_count }, rsp_addr); + info!("Response::TransactionCount {:?}", rsp); + Some(rsp) + } Request::Transaction(_) => unreachable!(), Request::Subscribe { subscriptions } => { for subscription in subscriptions { diff --git a/src/thin_client.rs b/src/thin_client.rs index 230b2d729e..b5337e75d1 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -19,6 +19,7 @@ pub struct ThinClient { pub events_socket: UdpSocket, last_id: Option, num_events: u64, + transaction_count: u64, balances: HashMap>, } @@ -33,6 +34,7 @@ impl ThinClient { events_socket, last_id: None, num_events: 0, + transaction_count: 0, balances: HashMap::new(), }; client.init(); @@ -62,6 +64,10 @@ impl ThinClient { info!("Response balance {:?} {:?}", key, val); self.balances.insert(key, val); } + Response::TransactionCount { transaction_count } => { + info!("Response transaction count {:?}", transaction_count); + self.transaction_count = transaction_count; + } Response::EntryInfo(entry_info) => { trace!("Response entry_info {:?}", entry_info.id); self.last_id = Some(entry_info.id); @@ -113,6 +119,28 @@ impl ThinClient { self.balances[pubkey].ok_or(io::Error::new(io::ErrorKind::Other, "nokey")) } + /// Request the transaction count. If the response packet is dropped by the network, + /// this method will hang. + pub fn server_transaction_count(&mut self) -> io::Result { + info!("server_transaction_count"); + let req = Request::GetTransactionCount; + let data = + serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count"); + self.requests_socket + .send_to(&data, &self.addr) + .expect("buffer error in pub fn transaction_count"); + let mut done = false; + while !done { + let resp = self.recv_response()?; + info!("recv_response {:?}", resp); + if let &Response::TransactionCount { .. } = &resp { + done = true; + } + self.process_response(resp); + } + Ok(self.transaction_count) + } + /// Request the last Entry ID from the server. This method blocks /// until the server sends a response. pub fn get_last_id(&mut self) -> FutureResult { From dfb754dd13b29af64396318a04feabe2ad0f8bd8 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 09:35:10 -0600 Subject: [PATCH 3/7] Revive GetLastId messages --- src/request.rs | 2 ++ src/request_processor.rs | 6 ++++++ src/thin_client.rs | 4 ++++ 3 files changed, 12 insertions(+) diff --git a/src/request.rs b/src/request.rs index 402aafc27b..6b07ff38a2 100644 --- a/src/request.rs +++ b/src/request.rs @@ -12,6 +12,7 @@ use transaction::Transaction; pub enum Request { Transaction(Transaction), GetBalance { key: PublicKey }, + GetLastId, GetTransactionCount, Subscribe { subscriptions: Vec }, } @@ -41,6 +42,7 @@ impl Request { #[derive(Serialize, Deserialize, Debug)] pub enum Response { Balance { key: PublicKey, val: Option }, + LastId { id: Hash }, TransactionCount { transaction_count: u64 }, EntryInfo(EntryInfo), } diff --git a/src/request_processor.rs b/src/request_processor.rs index ad7f76183b..f2b6c51e25 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -46,6 +46,12 @@ impl RequestProcessor { info!("Response::Balance {:?}", rsp); Some(rsp) } + Request::GetLastId => { + let id = self.accountant.last_id(); + let rsp = (Response::LastId { id }, rsp_addr); + info!("Response::LastId {:?}", rsp); + Some(rsp) + } Request::GetTransactionCount => { let transaction_count = self.accountant.transaction_count() as u64; let rsp = (Response::TransactionCount { transaction_count }, rsp_addr); diff --git a/src/thin_client.rs b/src/thin_client.rs index b5337e75d1..eee2fec17b 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -64,6 +64,10 @@ impl ThinClient { info!("Response balance {:?} {:?}", key, val); self.balances.insert(key, val); } + Response::LastId { id } => { + info!("Response last_id {:?}", id); + self.last_id = Some(id); + } Response::TransactionCount { transaction_count } => { info!("Response transaction count {:?}", transaction_count); self.transaction_count = transaction_count; From f168c377fdc6ba52bb5493d23d49531b1faa5d6f Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 09:40:29 -0600 Subject: [PATCH 4/7] Get last_id via GetLastId instead of EntryInfo --- src/thin_client.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/thin_client.rs b/src/thin_client.rs index eee2fec17b..d2d73f3fc9 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -148,8 +148,22 @@ impl ThinClient { /// Request the last Entry ID from the server. This method blocks /// until the server sends a response. pub fn get_last_id(&mut self) -> FutureResult { - self.transaction_count(); - ok(self.last_id.unwrap_or(Hash::default())) + info!("get_last_id"); + let req = Request::GetLastId; + let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id"); + self.requests_socket + .send_to(&data, &self.addr) + .expect("buffer error in pub fn get_last_id"); + let mut done = false; + while !done { + let resp = self.recv_response().expect("get_last_id response"); + info!("recv_response {:?}", resp); + if let &Response::LastId { .. } = &resp { + done = true; + } + self.process_response(resp); + } + ok(self.last_id.expect("some last_id")) } /// Return the number of transactions the server processed since creating From 5ba20a94e833c2ae17556debf94139cafd3c1572 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 09:43:40 -0600 Subject: [PATCH 5/7] Panic on error to get same signature as transaction_count() --- src/thin_client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/thin_client.rs b/src/thin_client.rs index d2d73f3fc9..6b1678bc2f 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -125,7 +125,7 @@ impl ThinClient { /// Request the transaction count. If the response packet is dropped by the network, /// this method will hang. - pub fn server_transaction_count(&mut self) -> io::Result { + pub fn server_transaction_count(&mut self) -> u64 { info!("server_transaction_count"); let req = Request::GetTransactionCount; let data = @@ -135,14 +135,14 @@ impl ThinClient { .expect("buffer error in pub fn transaction_count"); let mut done = false; while !done { - let resp = self.recv_response()?; + let resp = self.recv_response().expect("transaction count dropped"); info!("recv_response {:?}", resp); if let &Response::TransactionCount { .. } = &resp { done = true; } self.process_response(resp); } - Ok(self.transaction_count) + self.transaction_count } /// Request the last Entry ID from the server. This method blocks From 0ae69bdcd95e7d32b9480bee2335c08314d0caaa Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 09:45:09 -0600 Subject: [PATCH 6/7] Get transactionn_count via GetTransactionCount instead of EntryInfo --- src/thin_client.rs | 34 ++-------------------------------- 1 file changed, 2 insertions(+), 32 deletions(-) diff --git a/src/thin_client.rs b/src/thin_client.rs index 6b1678bc2f..9a6be16024 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -125,8 +125,8 @@ impl ThinClient { /// Request the transaction count. If the response packet is dropped by the network, /// this method will hang. - pub fn server_transaction_count(&mut self) -> u64 { - info!("server_transaction_count"); + pub fn transaction_count(&mut self) -> u64 { + info!("transaction_count"); let req = Request::GetTransactionCount; let data = serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count"); @@ -165,36 +165,6 @@ impl ThinClient { } ok(self.last_id.expect("some last_id")) } - - /// Return the number of transactions the server processed since creating - /// this client instance. - pub fn transaction_count(&mut self) -> u64 { - // Wait for at least one EntryInfo. - let mut done = false; - while !done { - let resp = self.recv_response() - .expect("recv_response in pub fn transaction_count"); - if let &Response::EntryInfo(_) = &resp { - done = true; - } - self.process_response(resp); - } - - // Then take the rest. - self.requests_socket - .set_nonblocking(true) - .expect("set_nonblocking in pub fn transaction_count"); - loop { - match self.recv_response() { - Err(_) => break, - Ok(resp) => self.process_response(resp), - } - } - self.requests_socket - .set_nonblocking(false) - .expect("set_nonblocking in pub fn transaction_count"); - self.num_events - } } #[cfg(test)] From cc447c0fda1370e86ef9fbd7fe747e5a0d24d0a3 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 09:53:57 -0600 Subject: [PATCH 7/7] Drop support for EntryInfo subscriptions --- src/entry_writer.rs | 10 ++------- src/request.rs | 14 ------------- src/request_processor.rs | 45 ++++------------------------------------ src/rpu.rs | 4 +--- src/thin_client.rs | 18 +--------------- src/tvu.rs | 4 +--- 6 files changed, 9 insertions(+), 86 deletions(-) diff --git a/src/entry_writer.rs b/src/entry_writer.rs index f50a7d313b..ab973ba3bc 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -4,7 +4,6 @@ use accountant::Accountant; use entry::Entry; use ledger; use packet; -use request_processor::RequestProcessor; use result::Result; use serde_json; use std::collections::VecDeque; @@ -17,16 +16,12 @@ use streamer; pub struct EntryWriter<'a> { accountant: &'a Accountant, - request_processor: &'a RequestProcessor, } impl<'a> EntryWriter<'a> { /// Create a new Tpu that wraps the given Accountant. - pub fn new(accountant: &'a Accountant, request_processor: &'a RequestProcessor) -> Self { - EntryWriter { - accountant, - request_processor, - } + pub fn new(accountant: &'a Accountant) -> Self { + EntryWriter { accountant } } fn write_entry(&self, writer: &Mutex, entry: &Entry) { @@ -37,7 +32,6 @@ impl<'a> EntryWriter<'a> { "{}", serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry") ).expect("writeln! in fn write_entry"); - self.request_processor.notify_entry_info_subscribers(&entry); } fn write_entries( diff --git a/src/request.rs b/src/request.rs index 6b07ff38a2..d1c692ae42 100644 --- a/src/request.rs +++ b/src/request.rs @@ -14,19 +14,6 @@ pub enum Request { GetBalance { key: PublicKey }, GetLastId, GetTransactionCount, - Subscribe { subscriptions: Vec }, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Subscription { - EntryInfo, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct EntryInfo { - pub id: Hash, - pub num_hashes: u64, - pub num_events: u64, } impl Request { @@ -44,7 +31,6 @@ pub enum Response { Balance { key: PublicKey, val: Option }, LastId { id: Hash }, TransactionCount { transaction_count: u64 }, - EntryInfo(EntryInfo), } pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { diff --git a/src/request_processor.rs b/src/request_processor.rs index f2b6c51e25..4e6c815edf 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -8,12 +8,12 @@ use event_processor::EventProcessor; use packet; use packet::SharedPackets; use rayon::prelude::*; -use request::{EntryInfo, Request, Response, Subscription}; +use request::{Request, Response}; use result::Result; use std::collections::VecDeque; -use std::net::{SocketAddr, UdpSocket}; +use std::net::SocketAddr; +use std::sync::Arc; use std::sync::mpsc::{Receiver, Sender}; -use std::sync::{Arc, Mutex}; use std::time::Duration; use std::time::Instant; use streamer; @@ -21,16 +21,12 @@ use timing; pub struct RequestProcessor { accountant: Arc, - entry_info_subscribers: Mutex>, } impl RequestProcessor { /// Create a new Tpu that wraps the given Accountant. pub fn new(accountant: Arc) -> Self { - RequestProcessor { - accountant, - entry_info_subscribers: Mutex::new(vec![]), - } + RequestProcessor { accountant } } /// Process Request items sent by clients. @@ -59,16 +55,6 @@ impl RequestProcessor { Some(rsp) } Request::Transaction(_) => unreachable!(), - Request::Subscribe { subscriptions } => { - for subscription in subscriptions { - match subscription { - Subscription::EntryInfo => { - self.entry_info_subscribers.lock().unwrap().push(rsp_addr) - } - } - } - None - } } } @@ -81,29 +67,6 @@ impl RequestProcessor { .collect() } - pub fn notify_entry_info_subscribers(&self, entry: &Entry) { - // TODO: No need to bind(). - let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); - - // copy subscribers to avoid taking lock while doing io - let addrs = self.entry_info_subscribers.lock().unwrap().clone(); - trace!("Sending to {} addrs", addrs.len()); - for addr in addrs { - let entry_info = EntryInfo { - id: entry.id, - num_hashes: entry.num_hashes, - num_events: entry.events.len() as u64, - }; - let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); - trace!("sending {} to {}", data.len(), addr); - //TODO dont do IO here, this needs to be on a separate channel - let res = socket.send_to(&data, addr); - if res.is_err() { - eprintln!("couldn't send response: {:?}", res); - } - } - } - fn deserialize_requests(p: &packet::Packets) -> Vec> { p.packets .par_iter() diff --git a/src/rpu.rs b/src/rpu.rs index 2cea96fab3..b8954c5542 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -33,7 +33,6 @@ impl Rpu { fn write_service( accountant: Arc, - request_processor: Arc, exit: Arc, broadcast: streamer::BlobSender, blob_recycler: packet::BlobRecycler, @@ -41,7 +40,7 @@ impl Rpu { entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || loop { - let entry_writer = EntryWriter::new(&accountant, &request_processor); + let entry_writer = EntryWriter::new(&accountant); let _ = entry_writer.write_and_send_entries( &broadcast, &blob_recycler, @@ -99,7 +98,6 @@ impl Rpu { let (broadcast_sender, broadcast_receiver) = channel(); let t_write = Self::write_service( self.event_processor.accountant.clone(), - request_stage.request_processor.clone(), exit.clone(), broadcast_sender, blob_recycler.clone(), diff --git a/src/thin_client.rs b/src/thin_client.rs index 9a6be16024..2bc2908432 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -6,7 +6,7 @@ use bincode::{deserialize, serialize}; use futures::future::{ok, FutureResult}; use hash::Hash; -use request::{Request, Response, Subscription}; +use request::{Request, Response}; use signature::{KeyPair, PublicKey, Signature}; use std::collections::HashMap; use std::io; @@ -18,7 +18,6 @@ pub struct ThinClient { pub requests_socket: UdpSocket, pub events_socket: UdpSocket, last_id: Option, - num_events: u64, transaction_count: u64, balances: HashMap>, } @@ -33,22 +32,12 @@ impl ThinClient { requests_socket, events_socket, last_id: None, - num_events: 0, transaction_count: 0, balances: HashMap::new(), }; - client.init(); client } - pub fn init(&self) { - let subscriptions = vec![Subscription::EntryInfo]; - let req = Request::Subscribe { subscriptions }; - let data = serialize(&req).expect("serialize Subscribe in thin_client"); - trace!("subscribing to {}", self.addr); - let _res = self.requests_socket.send_to(&data, &self.addr); - } - pub fn recv_response(&self) -> io::Result { let mut buf = vec![0u8; 1024]; info!("start recv_from"); @@ -72,11 +61,6 @@ impl ThinClient { info!("Response transaction count {:?}", transaction_count); self.transaction_count = transaction_count; } - Response::EntryInfo(entry_info) => { - trace!("Response entry_info {:?}", entry_info.id); - self.last_id = Some(entry_info.id); - self.num_events += entry_info.num_events; - } } } diff --git a/src/tvu.rs b/src/tvu.rs index d486a5888a..8deedbf23c 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -34,12 +34,11 @@ impl Tvu { fn drain_service( accountant: Arc, - request_processor: Arc, exit: Arc, entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || { - let entry_writer = EntryWriter::new(&accountant, &request_processor); + let entry_writer = EntryWriter::new(&accountant); loop { let _ = entry_writer.drain_entries(&entry_receiver); if exit.load(Ordering::Relaxed) { @@ -183,7 +182,6 @@ impl Tvu { let t_write = Self::drain_service( obj.event_processor.accountant.clone(), - request_stage.request_processor.clone(), exit.clone(), request_stage.entry_receiver, );