diff --git a/src/entry_writer.rs b/src/entry_writer.rs index 24a13e179..9f8b8dac3 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -1,7 +1,7 @@ //! The `entry_writer` module helps implement the TPU's write stage. +use accountant::Accountant; use entry::Entry; -use event_processor::EventProcessor; use ledger; use packet; use request_stage::RequestProcessor; @@ -16,25 +16,22 @@ use std::time::Duration; use streamer; pub struct EntryWriter<'a> { - event_processor: &'a EventProcessor, + accountant: &'a Accountant, request_processor: &'a RequestProcessor, } impl<'a> EntryWriter<'a> { /// Create a new Tpu that wraps the given Accountant. - pub fn new( - event_processor: &'a EventProcessor, - request_processor: &'a RequestProcessor, - ) -> Self { + pub fn new(accountant: &'a Accountant, request_processor: &'a RequestProcessor) -> Self { EntryWriter { - event_processor, + accountant, request_processor, } } fn write_entry(&self, writer: &Mutex, entry: &Entry) { trace!("write_entry entry"); - self.event_processor.accountant.register_entry_id(&entry.id); + self.accountant.register_entry_id(&entry.id); writeln!( writer.lock().expect("'writer' lock in fn fn write_entry"), "{}", diff --git a/src/rpu.rs b/src/rpu.rs index fe7a2e5eb..99707b8a8 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -1,6 +1,7 @@ //! The `rpu` module implements the Request Processing Unit, a //! 5-stage transaction processing pipeline in software. +use accountant::Accountant; use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; @@ -30,7 +31,7 @@ impl Rpu { } fn write_service( - event_processor: Arc, + accountant: Arc, request_processor: Arc, exit: Arc, broadcast: streamer::BlobSender, @@ -39,7 +40,7 @@ impl Rpu { entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || loop { - let entry_writer = EntryWriter::new(&event_processor, &request_processor); + let entry_writer = EntryWriter::new(&accountant, &request_processor); let _ = entry_writer.write_and_send_entries( &broadcast, &blob_recycler, @@ -96,7 +97,7 @@ impl Rpu { let (broadcast_sender, broadcast_receiver) = channel(); let t_write = Self::write_service( - self.event_processor.clone(), + self.event_processor.accountant.clone(), request_stage.request_processor.clone(), exit.clone(), broadcast_sender, diff --git a/src/thin_client.rs b/src/thin_client.rs index 318377ae3..3517fcb81 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -191,7 +191,7 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); - let rpu = Arc::new(Rpu::new(event_processor)); + let rpu = Rpu::new(event_processor); let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(300)); @@ -229,7 +229,7 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); - let rpu = Arc::new(Rpu::new(event_processor)); + let rpu = Rpu::new(event_processor); let serve_addr = leader_serve.local_addr().unwrap(); let threads = rpu.serve( leader_data, @@ -299,7 +299,7 @@ mod tests { let leader_acc = { let accountant = Accountant::new(&alice); let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); - Arc::new(Rpu::new(event_processor)) + Rpu::new(event_processor) }; let replicant_acc = { diff --git a/src/tvu.rs b/src/tvu.rs index 59bb599a3..0716ae725 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -1,6 +1,7 @@ //! The `tvu` module implements the Transaction Validation Unit, a //! 5-stage transaction validation pipeline in software. +use accountant::Accountant; use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; @@ -31,13 +32,13 @@ impl Tvu { } fn drain_service( - event_processor: Arc, + accountant: Arc, request_processor: Arc, exit: Arc, entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || { - let entry_writer = EntryWriter::new(&event_processor, &request_processor); + let entry_writer = EntryWriter::new(&accountant, &request_processor); loop { let _ = entry_writer.drain_entries(&entry_receiver); if exit.load(Ordering::Relaxed) { @@ -180,7 +181,7 @@ impl Tvu { ); let t_write = Self::drain_service( - obj.event_processor.clone(), + obj.event_processor.accountant.clone(), request_stage.request_processor.clone(), exit.clone(), request_stage.entry_receiver,