diff --git a/src/accountant.rs b/src/accountant.rs index 49c5d2c883..a94629837e 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -74,6 +74,13 @@ impl Accountant { acc } + /// Return the last entry ID registered + pub fn last_id(&self) -> Hash { + let last_ids = self.last_ids.read().unwrap(); + let last_item = last_ids.iter().last().expect("empty last_ids list"); + last_item.0 + } + fn reserve_signature(signatures: &RwLock>, sig: &Signature) -> bool { if signatures.read().unwrap().contains(sig) { return false; @@ -327,6 +334,8 @@ mod tests { let alice = Mint::new(10_000); let bob_pubkey = KeyPair::new().pubkey(); let acc = Accountant::new(&alice); + assert_eq!(acc.last_id(), alice.last_id()); + acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) .unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000); diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index bd8c7ddd24..c08e86620d 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -33,7 +33,6 @@ use transaction::Transaction; pub struct AccountantSkel { acc: Mutex, - last_id: Mutex, historian_input: Mutex>, historian: Historian, entry_info_subscribers: Mutex>, @@ -44,7 +43,6 @@ pub struct AccountantSkel { pub enum Request { Transaction(Transaction), GetBalance { key: PublicKey }, - GetLastId, Subscribe { subscriptions: Vec }, } @@ -76,20 +74,13 @@ type SharedSkel = Arc; pub enum Response { Balance { key: PublicKey, val: Option }, EntryInfo(EntryInfo), - LastId { id: Hash }, } impl AccountantSkel { /// Create a new AccountantSkel that wraps the given Accountant. - pub fn new( - acc: Accountant, - last_id: Hash, - historian_input: SyncSender, - historian: Historian, - ) -> Self { + pub fn new(acc: Accountant, historian_input: SyncSender, historian: Historian) -> Self { AccountantSkel { acc: Mutex::new(acc), - last_id: Mutex::new(last_id), entry_info_subscribers: Mutex::new(vec![]), historian_input: Mutex::new(historian_input), historian, @@ -116,10 +107,7 @@ impl AccountantSkel { fn update_entry(obj: &SharedSkel, writer: &Arc>, entry: &Entry) { trace!("update_entry entry"); - let mut last_id_l = obj.last_id.lock().unwrap(); - *last_id_l = entry.id; - obj.acc.lock().unwrap().register_entry_id(&last_id_l); - drop(last_id_l); + obj.acc.lock().unwrap().register_entry_id(&entry.id); writeln!( writer.lock().unwrap(), "{}", @@ -226,12 +214,6 @@ impl AccountantSkel { let val = self.acc.lock().unwrap().get_balance(&key); Some((Response::Balance { key, val }, rsp_addr)) } - Request::GetLastId => Some(( - Response::LastId { - id: *self.last_id.lock().unwrap(), - }, - rsp_addr, - )), Request::Transaction(_) => unreachable!(), Request::Subscribe { subscriptions } => { for subscription in subscriptions { @@ -699,7 +681,7 @@ mod tests { let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address"); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &mint.last_id(), None); - let skel = AccountantSkel::new(acc, mint.last_id(), input, historian); + let skel = AccountantSkel::new(acc, input, historian); // Process a batch that includes a transaction that receives two tokens. let alice = KeyPair::new(); @@ -740,7 +722,7 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc_skel = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian)); + let acc_skel = Arc::new(AccountantSkel::new(acc, input, historian)); let serve_addr = leader_serve.local_addr().unwrap(); let threads = AccountantSkel::serve( &acc_skel, @@ -858,7 +840,7 @@ mod tests { let acc = Accountant::new(&alice); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian)); + let acc = Arc::new(AccountantSkel::new(acc, input, historian)); let replicate_addr = target1_data.replicate_addr; let threads = AccountantSkel::replicate( &acc, @@ -1007,7 +989,7 @@ mod bench { let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &mint.last_id(), None); - let skel = AccountantSkel::new(acc, mint.last_id(), input, historian); + let skel = AccountantSkel::new(acc, input, historian); let now = Instant::now(); assert!(skel.process_packets(req_vers).is_ok()); diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 15a6a69cb7..1797a53e1c 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -57,9 +57,6 @@ impl AccountantStub { Response::Balance { key, val } => { self.balances.insert(key, val); } - Response::LastId { id } => { - self.last_id = Some(id); - } Response::EntryInfo(entry_info) => { self.last_id = Some(entry_info.id); self.num_events += entry_info.num_events; @@ -109,23 +106,9 @@ impl AccountantStub { } /// Request the last Entry ID from the server. This method blocks - /// until the server sends a response. At the time of this writing, - /// it also has the side-effect of causing the server to log any - /// entries that have been published by the Historian. + /// until the server sends a response. pub fn get_last_id(&mut self) -> FutureResult { - let req = Request::GetLastId; - let data = serialize(&req).expect("serialize GetId"); - self.socket - .send_to(&data, &self.addr) - .expect("buffer error"); - let mut done = false; - while !done { - let resp = self.recv_response().expect("recv response"); - if let &Response::LastId { .. } = &resp { - done = true; - } - self.process_response(resp); - } + self.transaction_count(); ok(self.last_id.unwrap_or(Hash::default())) } @@ -192,7 +175,7 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian)); + let acc = Arc::new(AccountantSkel::new(acc, input, historian)); let threads = AccountantSkel::serve(&acc, d, serve, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(300)); diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 2c585e6f27..c12840e843 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -104,7 +104,7 @@ fn main() { let (input, event_receiver) = sync_channel(10_000); let historian = Historian::new(event_receiver, &last_id, Some(1000)); let exit = Arc::new(AtomicBool::new(false)); - let skel = Arc::new(AccountantSkel::new(acc, last_id, input, historian)); + let skel = Arc::new(AccountantSkel::new(acc, input, historian)); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); diff --git a/src/streamer.rs b/src/streamer.rs index 471f1f29ce..808eea1e76 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -438,8 +438,8 @@ mod test { use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; - use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver, - PacketReceiver}; + use streamer::{blob_receiver, receiver, responder, retransmitter, window}; + use streamer::{BlobReceiver, PacketReceiver}; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 {