diff --git a/src/entry_writer.rs b/src/entry_writer.rs index 42fe5d53fc..24a13e1792 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -10,6 +10,7 @@ use serde_json; use std::collections::VecDeque; use std::io::Write; use std::io::sink; +use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex}; use std::time::Duration; use streamer; @@ -42,22 +43,17 @@ impl<'a> EntryWriter<'a> { self.request_processor.notify_entry_info_subscribers(&entry); } - fn write_entries(&self, writer: &Mutex) -> Result> { + fn write_entries( + &self, + writer: &Mutex, + entry_receiver: &Receiver, + ) -> Result> { //TODO implement a serialize for channel that does this without allocations let mut l = vec![]; - let entry = self.event_processor - .output - .lock() - .expect("'ouput' lock in fn receive_all") - .recv_timeout(Duration::new(1, 0))?; + let entry = entry_receiver.recv_timeout(Duration::new(1, 0))?; self.write_entry(writer, &entry); l.push(entry); - while let Ok(entry) = self.event_processor - .output - .lock() - .expect("'output' lock in fn write_entries") - .try_recv() - { + while let Ok(entry) = entry_receiver.try_recv() { self.write_entry(writer, &entry); l.push(entry); } @@ -71,9 +67,10 @@ impl<'a> EntryWriter<'a> { broadcast: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, writer: &Mutex, + entry_receiver: &Receiver, ) -> Result<()> { let mut q = VecDeque::new(); - let list = self.write_entries(writer)?; + let list = self.write_entries(writer, entry_receiver)?; trace!("New blobs? {}", list.len()); ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q); if !q.is_empty() { @@ -84,8 +81,8 @@ impl<'a> EntryWriter<'a> { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out - pub fn drain_entries(&self) -> Result<()> { - self.write_entries(&Arc::new(Mutex::new(sink())))?; + pub fn drain_entries(&self, entry_receiver: &Receiver) -> Result<()> { + self.write_entries(&Arc::new(Mutex::new(sink())), entry_receiver)?; Ok(()) } } diff --git a/src/event_processor.rs b/src/event_processor.rs index df547af3d1..19f69c9d6c 100644 --- a/src/event_processor.rs +++ b/src/event_processor.rs @@ -7,12 +7,10 @@ use hash::Hash; use historian::Historian; use recorder::Signal; use result::Result; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, Mutex}; pub struct EventProcessor { - pub output: Mutex>, - entry_sender: Mutex>, pub accountant: Arc, historian_input: Mutex>, historian: Mutex, @@ -23,10 +21,7 @@ impl EventProcessor { pub fn new(accountant: Accountant, start_hash: &Hash, ms_per_tick: Option) -> Self { let (historian_input, event_receiver) = channel(); let historian = Historian::new(event_receiver, start_hash, ms_per_tick); - let (entry_sender, output) = channel(); EventProcessor { - output: Mutex::new(output), - entry_sender: Mutex::new(entry_sender), accountant: Arc::new(accountant), historian_input: Mutex::new(historian_input), historian: Mutex::new(historian), @@ -34,7 +29,7 @@ impl EventProcessor { } /// Process the transactions in parallel and then log the successful ones. - pub fn process_events(&self, events: Vec) -> Result<()> { + pub fn process_events(&self, events: Vec) -> Result { let historian = self.historian.lock().unwrap(); let results = self.accountant.process_verified_events(events); let events = results.into_iter().filter_map(|x| x.ok()).collect(); @@ -44,15 +39,13 @@ impl EventProcessor { // Wait for the historian to tag our Events with an ID and then register it. let entry = historian.output.lock().unwrap().recv()?; self.accountant.register_entry_id(&entry.id); - self.entry_sender.lock().unwrap().send(entry)?; - Ok(()) + Ok(entry) } } #[cfg(test)] mod tests { use accountant::Accountant; - use entry::Entry; use event::Event; use event_processor::EventProcessor; use mint::Mint; @@ -60,6 +53,8 @@ mod tests { use transaction::Transaction; #[test] + // TODO: Move this test accounting_stage. Calling process_events() directly + // defeats the purpose of this test. fn test_accounting_sequential_consistency() { // In this attack we'll demonstrate that a verifier can interpret the ledger // differently if either the server doesn't signal the ledger to add an @@ -72,16 +67,15 @@ mod tests { let alice = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); let events = vec![Event::Transaction(tr)]; - assert!(event_processor.process_events(events).is_ok()); + let entry0 = event_processor.process_events(events).unwrap(); // Process a second batch that spends one of those tokens. let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); let events = vec![Event::Transaction(tr)]; - assert!(event_processor.process_events(events).is_ok()); + let entry1 = event_processor.process_events(events).unwrap(); // Collect the ledger and feed it to a new accountant. - drop(event_processor.entry_sender); - let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); + let entries = vec![entry0, entry1]; // Assert the user holds one token, not two. If the server only output one // entry, then the second transaction will be rejected, because it drives diff --git a/src/request_stage.rs b/src/request_stage.rs index 3695fd01a2..068ce3e812 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -14,7 +14,7 @@ use signature::PublicKey; use std::collections::VecDeque; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver}; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; @@ -207,6 +207,7 @@ impl RequestProcessor { &self, event_processor: &EventProcessor, verified_receiver: &Receiver)>>, + entry_sender: &Sender, responder_sender: &streamer::BlobSender, packet_recycler: &packet::PacketRecycler, blob_recycler: &packet::BlobRecycler, @@ -240,7 +241,8 @@ impl RequestProcessor { debug!("events: {} reqs: {}", events.len(), reqs.len()); debug!("process_events"); - event_processor.process_events(events)?; + let entry = event_processor.process_events(events)?; + entry_sender.send(entry)?; debug!("done process_events"); debug!("process_requests"); @@ -271,6 +273,7 @@ impl RequestProcessor { pub struct RequestStage { pub thread_hdl: JoinHandle<()>, + pub entry_receiver: Receiver, pub output: streamer::BlobReceiver, pub request_processor: Arc, } @@ -286,11 +289,13 @@ impl RequestStage { ) -> Self { let request_processor = Arc::new(request_processor); let request_processor_ = request_processor.clone(); + let (entry_sender, entry_receiver) = channel(); let (responder_sender, output) = channel(); let thread_hdl = spawn(move || loop { let e = request_processor_.process_request_packets( &event_processor, &verified_receiver, + &entry_sender, &responder_sender, &packet_recycler, &blob_recycler, @@ -303,6 +308,7 @@ impl RequestStage { }); RequestStage { thread_hdl, + entry_receiver, output, request_processor, } diff --git a/src/rpu.rs b/src/rpu.rs index 5fee29e0b5..e4e275eee7 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -2,6 +2,7 @@ //! 5-stage transaction processing pipeline in software. use crdt::{Crdt, ReplicatedData}; +use entry::Entry; use entry_writer::EntryWriter; use event_processor::EventProcessor; use packet; @@ -11,7 +12,7 @@ use sig_verify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; +use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use streamer; @@ -35,10 +36,16 @@ impl Rpu { broadcast: streamer::BlobSender, blob_recycler: packet::BlobRecycler, writer: Mutex, + entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || loop { let entry_writer = EntryWriter::new(&event_processor, &request_processor); - let _ = entry_writer.write_and_send_entries(&broadcast, &blob_recycler, &writer); + let _ = entry_writer.write_and_send_entries( + &broadcast, + &blob_recycler, + &writer, + &entry_receiver, + ); if exit.load(Ordering::Relaxed) { info!("broadcat_service exiting"); break; @@ -95,6 +102,7 @@ impl Rpu { broadcast_sender, blob_recycler.clone(), Mutex::new(writer), + request_stage.entry_receiver, ); let broadcast_socket = UdpSocket::bind(local)?; diff --git a/src/tvu.rs b/src/tvu.rs index 0d59ee3716..869a9ba4ea 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -2,6 +2,7 @@ //! 5-stage transaction validation pipeline in software. use crdt::{Crdt, ReplicatedData}; +use entry::Entry; use entry_writer::EntryWriter; use event_processor::EventProcessor; use ledger; @@ -11,7 +12,7 @@ use result::Result; use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; +use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; @@ -33,11 +34,12 @@ impl Tvu { event_processor: Arc, request_processor: Arc, exit: Arc, + entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || { let entry_writer = EntryWriter::new(&event_processor, &request_processor); loop { - let _ = entry_writer.drain_entries(); + let _ = entry_writer.drain_entries(&entry_receiver); if exit.load(Ordering::Relaxed) { info!("drain_service exiting"); break; @@ -181,6 +183,7 @@ impl Tvu { obj.event_processor.clone(), request_stage.request_processor.clone(), exit.clone(), + request_stage.entry_receiver, ); let t_responder = streamer::responder(