Intercept historian output from accounting stage

We were accessing the accountant from multiple stages just to
register the ID the historian adds to Events.

This change should cause a whole lot of Arcs and Mutexes to go away.
This commit is contained in:
Greg Fitzgerald 2018-05-09 12:00:34 -06:00
parent 6967cf7f86
commit 778bec0777
2 changed files with 29 additions and 17 deletions

View File

@ -5,36 +5,52 @@ use bincode::serialize;
use entry::Entry;
use event::Event;
use hash::Hash;
use historian::Historian;
use recorder::Signal;
use result::Result;
use signature::PublicKey;
use std::net::{SocketAddr, UdpSocket};
use std::sync::mpsc::Sender;
use std::sync::Mutex;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use transaction::Transaction;
pub struct AccountingStage {
pub output: Arc<Mutex<Receiver<Entry>>>,
entry_sender: Arc<Mutex<Sender<Entry>>>,
pub acc: Mutex<Accountant>,
historian_input: Mutex<Sender<Signal>>,
historian: Mutex<Historian>,
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
}
impl AccountingStage {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(acc: Accountant, historian_input: Sender<Signal>) -> Self {
pub fn new(acc: Accountant, historian_input: Sender<Signal>, historian: Historian) -> Self {
let (entry_sender, output) = channel();
AccountingStage {
output: Arc::new(Mutex::new(output)),
entry_sender: Arc::new(Mutex::new(entry_sender)),
acc: Mutex::new(acc),
entry_info_subscribers: Mutex::new(vec![]),
historian_input: Mutex::new(historian_input),
historian: Mutex::new(historian),
}
}
/// Process the transactions in parallel and then log the successful ones.
pub fn process_events(&self, events: Vec<Event>) -> Result<()> {
let results = self.acc.lock().unwrap().process_verified_events(events);
let acc = self.acc.lock().unwrap();
let historian = self.historian.lock().unwrap();
let results = acc.process_verified_events(events);
let events = results.into_iter().filter_map(|x| x.ok()).collect();
let sender = self.historian_input.lock().unwrap();
sender.send(Signal::Events(events))?;
// Wait for the historian to tag our Events with an ID and then register it.
let entry = historian.output.lock().unwrap().recv()?;
acc.register_entry_id(&entry.id);
self.entry_sender.lock().unwrap().send(entry)?;
debug!("after historian_input");
Ok(())
}
@ -156,7 +172,7 @@ mod tests {
let acc = Accountant::new(&mint);
let (input, event_receiver) = channel();
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let stage = AccountingStage::new(acc, input);
let stage = AccountingStage::new(acc, input, historian);
// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
@ -170,8 +186,8 @@ mod tests {
assert!(stage.process_events(events).is_ok());
// Collect the ledger and feed it to a new accountant.
drop(stage.historian_input);
let entries: Vec<Entry> = historian.output.lock().unwrap().iter().collect();
drop(stage.entry_sender);
let entries: Vec<Entry> = stage.output.lock().unwrap().iter().collect();
// Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives
@ -247,7 +263,7 @@ mod bench {
let (input, event_receiver) = channel();
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let stage = AccountingStage::new(acc, input);
let stage = AccountingStage::new(acc, input, historian);
let now = Instant::now();
assert!(stage.process_events(events).is_ok());
@ -257,7 +273,7 @@ mod bench {
// Ensure that all transactions were successfully logged.
drop(stage.historian_input);
let entries: Vec<Entry> = historian.output.lock().unwrap().iter().collect();
let entries: Vec<Entry> = stage.output.lock().unwrap().iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize);

View File

@ -32,7 +32,6 @@ use timing;
pub struct Tpu {
accounting: AccountingStage,
historian: Historian,
}
type SharedTpu = Arc<Tpu>;
@ -40,11 +39,8 @@ type SharedTpu = Arc<Tpu>;
impl Tpu {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(acc: Accountant, historian_input: Sender<Signal>, historian: Historian) -> Self {
let accounting = AccountingStage::new(acc, historian_input);
Tpu {
accounting,
historian,
}
let accounting = AccountingStage::new(acc, historian_input, historian);
Tpu { accounting }
}
fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) {
@ -65,14 +61,14 @@ impl Tpu {
fn receive_all<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations
let mut l = vec![];
let entry = obj.historian
let entry = obj.accounting
.output
.lock()
.unwrap()
.recv_timeout(Duration::new(1, 0))?;
Self::update_entry(obj, writer, &entry);
l.push(entry);
while let Ok(entry) = obj.historian.receive() {
while let Ok(entry) = obj.accounting.output.lock().unwrap().try_recv() {
Self::update_entry(obj, writer, &entry);
l.push(entry);
}