From f107c6c2ca9238a81dd1594a4dfdf94d618d28f1 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 9 May 2018 14:21:42 -0600 Subject: [PATCH] Don't wrap thread-safe objects with mutexes --- src/accounting_stage.rs | 11 +++++------ src/tpu.rs | 12 ++++-------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/src/accounting_stage.rs b/src/accounting_stage.rs index 5de02a6565..27dc1472be 100644 --- a/src/accounting_stage.rs +++ b/src/accounting_stage.rs @@ -17,7 +17,7 @@ use transaction::Transaction; pub struct AccountingStage { pub output: Arc>>, entry_sender: Arc>>, - pub acc: Mutex, + pub acc: Accountant, historian_input: Mutex>, historian: Mutex, entry_info_subscribers: Mutex>, @@ -32,7 +32,7 @@ impl AccountingStage { AccountingStage { output: Arc::new(Mutex::new(output)), entry_sender: Arc::new(Mutex::new(entry_sender)), - acc: Mutex::new(acc), + acc, entry_info_subscribers: Mutex::new(vec![]), historian_input: Mutex::new(historian_input), historian: Mutex::new(historian), @@ -41,16 +41,15 @@ impl AccountingStage { /// Process the transactions in parallel and then log the successful ones. pub fn process_events(&self, events: Vec) -> Result<()> { - let acc = self.acc.lock().unwrap(); let historian = self.historian.lock().unwrap(); - let results = acc.process_verified_events(events); + let results = self.acc.process_verified_events(events); let events = results.into_iter().filter_map(|x| x.ok()).collect(); let sender = self.historian_input.lock().unwrap(); sender.send(Signal::Events(events))?; // Wait for the historian to tag our Events with an ID and then register it. let entry = historian.output.lock().unwrap().recv()?; - acc.register_entry_id(&entry.id); + self.acc.register_entry_id(&entry.id); self.entry_sender.lock().unwrap().send(entry)?; debug!("after historian_input"); @@ -65,7 +64,7 @@ impl AccountingStage { ) -> Option<(Response, SocketAddr)> { match msg { Request::GetBalance { key } => { - let val = self.acc.lock().unwrap().get_balance(&key); + let val = self.acc.get_balance(&key); let rsp = (Response::Balance { key, val }, rsp_addr); info!("Response::Balance {:?}", rsp); Some(rsp) diff --git a/src/tpu.rs b/src/tpu.rs index d79caecd19..96984838b4 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -41,11 +41,7 @@ impl Tpu { fn update_entry(obj: &SharedTpu, writer: &Arc>, entry: &Entry) { trace!("update_entry entry"); - obj.accounting - .acc - .lock() - .unwrap() - .register_entry_id(&entry.id); + obj.accounting.acc.register_entry_id(&entry.id); writeln!( writer.lock().unwrap(), "{}", @@ -374,7 +370,7 @@ impl Tpu { for msgs in &blobs { let blob = msgs.read().unwrap(); let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); - let acc = obj.accounting.acc.lock().unwrap(); + let acc = &obj.accounting.acc; for entry in entries { acc.register_entry_id(&entry.id); for result in acc.process_verified_events(entry.events) { @@ -809,7 +805,7 @@ mod tests { w.set_index(i).unwrap(); w.set_id(leader_id).unwrap(); - let acc = tpu.accounting.acc.lock().unwrap(); + let acc = &tpu.accounting.acc; let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); @@ -851,7 +847,7 @@ mod tests { msgs.push(msg); } - let acc = tpu.accounting.acc.lock().unwrap(); + let acc = &tpu.accounting.acc; let alice_balance = acc.get_balance(&alice.keypair().pubkey()).unwrap(); assert_eq!(alice_balance, alice_ref_balance);