Don't wrap thread-safe objects with mutexes

This commit is contained in:
Greg Fitzgerald 2018-05-09 14:21:42 -06:00
parent 7daf14caa7
commit f107c6c2ca
2 changed files with 9 additions and 14 deletions

View File

@ -17,7 +17,7 @@ use transaction::Transaction;
pub struct AccountingStage { pub struct AccountingStage {
pub output: Arc<Mutex<Receiver<Entry>>>, pub output: Arc<Mutex<Receiver<Entry>>>,
entry_sender: Arc<Mutex<Sender<Entry>>>, entry_sender: Arc<Mutex<Sender<Entry>>>,
pub acc: Mutex<Accountant>, pub acc: Accountant,
historian_input: Mutex<Sender<Signal>>, historian_input: Mutex<Sender<Signal>>,
historian: Mutex<Historian>, historian: Mutex<Historian>,
entry_info_subscribers: Mutex<Vec<SocketAddr>>, entry_info_subscribers: Mutex<Vec<SocketAddr>>,
@ -32,7 +32,7 @@ impl AccountingStage {
AccountingStage { AccountingStage {
output: Arc::new(Mutex::new(output)), output: Arc::new(Mutex::new(output)),
entry_sender: Arc::new(Mutex::new(entry_sender)), entry_sender: Arc::new(Mutex::new(entry_sender)),
acc: Mutex::new(acc), acc,
entry_info_subscribers: Mutex::new(vec![]), entry_info_subscribers: Mutex::new(vec![]),
historian_input: Mutex::new(historian_input), historian_input: Mutex::new(historian_input),
historian: Mutex::new(historian), historian: Mutex::new(historian),
@ -41,16 +41,15 @@ impl AccountingStage {
/// Process the transactions in parallel and then log the successful ones. /// Process the transactions in parallel and then log the successful ones.
pub fn process_events(&self, events: Vec<Event>) -> Result<()> { pub fn process_events(&self, events: Vec<Event>) -> Result<()> {
let acc = self.acc.lock().unwrap();
let historian = self.historian.lock().unwrap(); let historian = self.historian.lock().unwrap();
let results = acc.process_verified_events(events); let results = self.acc.process_verified_events(events);
let events = results.into_iter().filter_map(|x| x.ok()).collect(); let events = results.into_iter().filter_map(|x| x.ok()).collect();
let sender = self.historian_input.lock().unwrap(); let sender = self.historian_input.lock().unwrap();
sender.send(Signal::Events(events))?; sender.send(Signal::Events(events))?;
// Wait for the historian to tag our Events with an ID and then register it. // Wait for the historian to tag our Events with an ID and then register it.
let entry = historian.output.lock().unwrap().recv()?; let entry = historian.output.lock().unwrap().recv()?;
acc.register_entry_id(&entry.id); self.acc.register_entry_id(&entry.id);
self.entry_sender.lock().unwrap().send(entry)?; self.entry_sender.lock().unwrap().send(entry)?;
debug!("after historian_input"); debug!("after historian_input");
@ -65,7 +64,7 @@ impl AccountingStage {
) -> Option<(Response, SocketAddr)> { ) -> Option<(Response, SocketAddr)> {
match msg { match msg {
Request::GetBalance { key } => { Request::GetBalance { key } => {
let val = self.acc.lock().unwrap().get_balance(&key); let val = self.acc.get_balance(&key);
let rsp = (Response::Balance { key, val }, rsp_addr); let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp); info!("Response::Balance {:?}", rsp);
Some(rsp) Some(rsp)

View File

@ -41,11 +41,7 @@ impl Tpu {
fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) { fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) {
trace!("update_entry entry"); trace!("update_entry entry");
obj.accounting obj.accounting.acc.register_entry_id(&entry.id);
.acc
.lock()
.unwrap()
.register_entry_id(&entry.id);
writeln!( writeln!(
writer.lock().unwrap(), writer.lock().unwrap(),
"{}", "{}",
@ -374,7 +370,7 @@ impl Tpu {
for msgs in &blobs { for msgs in &blobs {
let blob = msgs.read().unwrap(); let blob = msgs.read().unwrap();
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap(); let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
let acc = obj.accounting.acc.lock().unwrap(); let acc = &obj.accounting.acc;
for entry in entries { for entry in entries {
acc.register_entry_id(&entry.id); acc.register_entry_id(&entry.id);
for result in acc.process_verified_events(entry.events) { for result in acc.process_verified_events(entry.events) {
@ -809,7 +805,7 @@ mod tests {
w.set_index(i).unwrap(); w.set_index(i).unwrap();
w.set_id(leader_id).unwrap(); w.set_id(leader_id).unwrap();
let acc = tpu.accounting.acc.lock().unwrap(); let acc = &tpu.accounting.acc;
let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let tr0 = Event::new_timestamp(&bob_keypair, Utc::now());
let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]);
@ -851,7 +847,7 @@ mod tests {
msgs.push(msg); msgs.push(msg);
} }
let acc = tpu.accounting.acc.lock().unwrap(); let acc = &tpu.accounting.acc;
let alice_balance = acc.get_balance(&alice.keypair().pubkey()).unwrap(); let alice_balance = acc.get_balance(&alice.keypair().pubkey()).unwrap();
assert_eq!(alice_balance, alice_ref_balance); assert_eq!(alice_balance, alice_ref_balance);