diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 41a871ea0..74056b13b 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -7,10 +7,10 @@ extern crate solana; use getopts::Options; use isatty::stdin_isatty; use solana::accountant::Accountant; -use solana::accounting_stage::AccountingStage; use solana::crdt::ReplicatedData; use solana::entry::Entry; use solana::event::Event; +use solana::event_processor::EventProcessor; use solana::rpu::Rpu; use solana::signature::{KeyPair, KeyPairUtil}; use std::env; @@ -115,9 +115,9 @@ fn main() { eprintln!("creating networking stack..."); - let accounting_stage = AccountingStage::new(accountant, &last_id, Some(1000)); + let event_processor = EventProcessor::new(accountant, &last_id, Some(1000)); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Arc::new(Rpu::new(accounting_stage)); + let rpu = Rpu::new(event_processor); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); diff --git a/src/entry_writer.rs b/src/entry_writer.rs index 48b5d195f..42fe5d53f 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -1,7 +1,7 @@ //! The `entry_writer` module helps implement the TPU's write stage. -use accounting_stage::AccountingStage; use entry::Entry; +use event_processor::EventProcessor; use ledger; use packet; use request_stage::RequestProcessor; @@ -15,27 +15,25 @@ use std::time::Duration; use streamer; pub struct EntryWriter<'a> { - accounting_stage: &'a AccountingStage, + event_processor: &'a EventProcessor, request_processor: &'a RequestProcessor, } impl<'a> EntryWriter<'a> { /// Create a new Tpu that wraps the given Accountant. pub fn new( - accounting_stage: &'a AccountingStage, + event_processor: &'a EventProcessor, request_processor: &'a RequestProcessor, ) -> Self { EntryWriter { - accounting_stage, + event_processor, request_processor, } } fn write_entry(&self, writer: &Mutex, entry: &Entry) { trace!("write_entry entry"); - self.accounting_stage - .accountant - .register_entry_id(&entry.id); + self.event_processor.accountant.register_entry_id(&entry.id); writeln!( writer.lock().expect("'writer' lock in fn fn write_entry"), "{}", @@ -47,14 +45,14 @@ impl<'a> EntryWriter<'a> { fn write_entries(&self, writer: &Mutex) -> Result> { //TODO implement a serialize for channel that does this without allocations let mut l = vec![]; - let entry = self.accounting_stage + let entry = self.event_processor .output .lock() .expect("'ouput' lock in fn receive_all") .recv_timeout(Duration::new(1, 0))?; self.write_entry(writer, &entry); l.push(entry); - while let Ok(entry) = self.accounting_stage + while let Ok(entry) = self.event_processor .output .lock() .expect("'output' lock in fn write_entries") diff --git a/src/accounting_stage.rs b/src/event_processor.rs similarity index 87% rename from src/accounting_stage.rs rename to src/event_processor.rs index 20a95b6a3..df547af3d 100644 --- a/src/accounting_stage.rs +++ b/src/event_processor.rs @@ -1,4 +1,4 @@ -//! The `accounting_stage` module implements the accounting stage of the TPU. +//! The `event_processor` module implements the accounting stage of the TPU. use accountant::Accountant; use entry::Entry; @@ -10,7 +10,7 @@ use result::Result; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; -pub struct AccountingStage { +pub struct EventProcessor { pub output: Mutex>, entry_sender: Mutex>, pub accountant: Arc, @@ -18,13 +18,13 @@ pub struct AccountingStage { historian: Mutex, } -impl AccountingStage { +impl EventProcessor { /// Create a new stage of the TPU for event and transaction processing pub fn new(accountant: Accountant, start_hash: &Hash, ms_per_tick: Option) -> Self { let (historian_input, event_receiver) = channel(); let historian = Historian::new(event_receiver, start_hash, ms_per_tick); let (entry_sender, output) = channel(); - AccountingStage { + EventProcessor { output: Mutex::new(output), entry_sender: Mutex::new(entry_sender), accountant: Arc::new(accountant), @@ -52,9 +52,9 @@ impl AccountingStage { #[cfg(test)] mod tests { use accountant::Accountant; - use accounting_stage::AccountingStage; use entry::Entry; use event::Event; + use event_processor::EventProcessor; use mint::Mint; use signature::{KeyPair, KeyPairUtil}; use transaction::Transaction; @@ -66,22 +66,22 @@ mod tests { // Entry OR if the verifier tries to parallelize across multiple Entries. let mint = Mint::new(2); let accountant = Accountant::new(&mint); - let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None); + let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); // Process a batch that includes a transaction that receives two tokens. let alice = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); let events = vec![Event::Transaction(tr)]; - assert!(accounting_stage.process_events(events).is_ok()); + assert!(event_processor.process_events(events).is_ok()); // Process a second batch that spends one of those tokens. let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); let events = vec![Event::Transaction(tr)]; - assert!(accounting_stage.process_events(events).is_ok()); + assert!(event_processor.process_events(events).is_ok()); // Collect the ledger and feed it to a new accountant. - drop(accounting_stage.entry_sender); - let entries: Vec = accounting_stage.output.lock().unwrap().iter().collect(); + drop(event_processor.entry_sender); + let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); // Assert the user holds one token, not two. If the server only output one // entry, then the second transaction will be rejected, because it drives @@ -104,8 +104,8 @@ mod bench { extern crate test; use self::test::Bencher; use accountant::{Accountant, MAX_ENTRY_IDS}; - use accounting_stage::*; use bincode::serialize; + use event_processor::*; use hash::hash; use mint::Mint; use rayon::prelude::*; @@ -154,17 +154,17 @@ mod bench { .map(|tr| Event::Transaction(tr)) .collect(); - let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None); + let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); let now = Instant::now(); - assert!(accounting_stage.process_events(events).is_ok()); + assert!(event_processor.process_events(events).is_ok()); let duration = now.elapsed(); let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; let tps = txs as f64 / sec; // Ensure that all transactions were successfully logged. - drop(accounting_stage.historian_input); - let entries: Vec = accounting_stage.output.lock().unwrap().iter().collect(); + drop(event_processor.historian_input); + let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].events.len(), txs as usize); diff --git a/src/lib.rs b/src/lib.rs index 30f87959f..522cb33fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,5 @@ #![cfg_attr(feature = "unstable", feature(test))] pub mod accountant; -pub mod accounting_stage; pub mod crdt; pub mod ecdsa; pub mod entry; @@ -8,6 +7,7 @@ pub mod entry_writer; #[cfg(feature = "erasure")] pub mod erasure; pub mod event; +pub mod event_processor; pub mod hash; pub mod historian; pub mod ledger; diff --git a/src/request_stage.rs b/src/request_stage.rs index 30e6dae52..3695fd01a 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -1,10 +1,10 @@ //! The `request_stage` processes thin client Request messages. use accountant::Accountant; -use accounting_stage::AccountingStage; use bincode::{deserialize, serialize}; use entry::Entry; use event::Event; +use event_processor::EventProcessor; use hash::Hash; use packet; use packet::SharedPackets; @@ -205,7 +205,7 @@ impl RequestProcessor { pub fn process_request_packets( &self, - accounting_stage: &AccountingStage, + event_processor: &EventProcessor, verified_receiver: &Receiver)>>, responder_sender: &streamer::BlobSender, packet_recycler: &packet::PacketRecycler, @@ -240,7 +240,7 @@ impl RequestProcessor { debug!("events: {} reqs: {}", events.len(), reqs.len()); debug!("process_events"); - accounting_stage.process_events(events)?; + event_processor.process_events(events)?; debug!("done process_events"); debug!("process_requests"); @@ -278,7 +278,7 @@ pub struct RequestStage { impl RequestStage { pub fn new( request_processor: RequestProcessor, - accounting_stage: Arc, + event_processor: Arc, exit: Arc, verified_receiver: Receiver)>>, packet_recycler: packet::PacketRecycler, @@ -289,7 +289,7 @@ impl RequestStage { let (responder_sender, output) = channel(); let thread_hdl = spawn(move || loop { let e = request_processor_.process_request_packets( - &accounting_stage, + &event_processor, &verified_receiver, &responder_sender, &packet_recycler, diff --git a/src/rpu.rs b/src/rpu.rs index 5e5bcb037..5fee29e0b 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -1,9 +1,9 @@ //! The `rpu` module implements the Request Processing Unit, a //! 5-stage transaction processing pipeline in software. -use accounting_stage::AccountingStage; use crdt::{Crdt, ReplicatedData}; use entry_writer::EntryWriter; +use event_processor::EventProcessor; use packet; use request_stage::{RequestProcessor, RequestStage}; use result::Result; @@ -17,19 +17,19 @@ use std::thread::{spawn, JoinHandle}; use streamer; pub struct Rpu { - accounting_stage: Arc, + event_processor: Arc, } impl Rpu { /// Create a new Rpu that wraps the given Accountant. - pub fn new(accounting_stage: AccountingStage) -> Self { + pub fn new(event_processor: EventProcessor) -> Self { Rpu { - accounting_stage: Arc::new(accounting_stage), + event_processor: Arc::new(event_processor), } } fn write_service( - accounting_stage: Arc, + event_processor: Arc, request_processor: Arc, exit: Arc, broadcast: streamer::BlobSender, @@ -37,7 +37,7 @@ impl Rpu { writer: Mutex, ) -> JoinHandle<()> { spawn(move || loop { - let entry_writer = EntryWriter::new(&accounting_stage, &request_processor); + let entry_writer = EntryWriter::new(&event_processor, &request_processor); let _ = entry_writer.write_and_send_entries(&broadcast, &blob_recycler, &writer); if exit.load(Ordering::Relaxed) { info!("broadcat_service exiting"); @@ -77,10 +77,10 @@ impl Rpu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let blob_recycler = packet::BlobRecycler::default(); - let request_processor = RequestProcessor::new(self.accounting_stage.accountant.clone()); + let request_processor = RequestProcessor::new(self.event_processor.accountant.clone()); let request_stage = RequestStage::new( request_processor, - self.accounting_stage.clone(), + self.event_processor.clone(), exit.clone(), sig_verify_stage.output, packet_recycler.clone(), @@ -89,7 +89,7 @@ impl Rpu { let (broadcast_sender, broadcast_receiver) = channel(); let t_write = Self::write_service( - self.accounting_stage.clone(), + self.event_processor.clone(), request_stage.request_processor.clone(), exit.clone(), broadcast_sender, diff --git a/src/thin_client.rs b/src/thin_client.rs index e4d56be7e..318377ae3 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -155,8 +155,8 @@ impl ThinClient { mod tests { use super::*; use accountant::Accountant; - use accounting_stage::AccountingStage; use crdt::{Crdt, ReplicatedData}; + use event_processor::EventProcessor; use futures::Future; use logger; use mint::Mint; @@ -190,8 +190,8 @@ mod tests { let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); - let rpu = Arc::new(Rpu::new(accounting_stage)); + let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let rpu = Arc::new(Rpu::new(event_processor)); let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(300)); @@ -228,8 +228,8 @@ mod tests { let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); - let rpu = Arc::new(Rpu::new(accounting_stage)); + let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let rpu = Arc::new(Rpu::new(event_processor)); let serve_addr = leader_serve.local_addr().unwrap(); let threads = rpu.serve( leader_data, @@ -298,14 +298,14 @@ mod tests { let leader_acc = { let accountant = Accountant::new(&alice); - let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); - Arc::new(Rpu::new(accounting_stage)) + let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + Arc::new(Rpu::new(event_processor)) }; let replicant_acc = { let accountant = Accountant::new(&alice); - let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); - Arc::new(Tvu::new(accounting_stage)) + let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + Arc::new(Tvu::new(event_processor)) }; let leader_threads = leader_acc diff --git a/src/tvu.rs b/src/tvu.rs index b085494cb..0d59ee371 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -1,9 +1,9 @@ //! The `tvu` module implements the Transaction Validation Unit, a //! 5-stage transaction validation pipeline in software. -use accounting_stage::AccountingStage; use crdt::{Crdt, ReplicatedData}; use entry_writer::EntryWriter; +use event_processor::EventProcessor; use ledger; use packet; use request_stage::{RequestProcessor, RequestStage}; @@ -18,24 +18,24 @@ use std::time::Duration; use streamer; pub struct Tvu { - accounting_stage: Arc, + event_processor: Arc, } impl Tvu { /// Create a new Tvu that wraps the given Accountant. - pub fn new(accounting_stage: AccountingStage) -> Self { + pub fn new(event_processor: EventProcessor) -> Self { Tvu { - accounting_stage: Arc::new(accounting_stage), + event_processor: Arc::new(event_processor), } } fn drain_service( - accounting_stage: Arc, + event_processor: Arc, request_processor: Arc, exit: Arc, ) -> JoinHandle<()> { spawn(move || { - let entry_writer = EntryWriter::new(&accounting_stage, &request_processor); + let entry_writer = EntryWriter::new(&event_processor, &request_processor); loop { let _ = entry_writer.drain_entries(); if exit.load(Ordering::Relaxed) { @@ -57,7 +57,7 @@ impl Tvu { let blobs = verified_receiver.recv_timeout(timer)?; trace!("replicating blobs {}", blobs.len()); let entries = ledger::reconstruct_entries_from_blobs(&blobs); - obj.accounting_stage + obj.event_processor .accountant .process_verified_entries(entries)?; for blob in blobs { @@ -167,10 +167,10 @@ impl Tvu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - let request_processor = RequestProcessor::new(obj.accounting_stage.accountant.clone()); + let request_processor = RequestProcessor::new(obj.event_processor.accountant.clone()); let request_stage = RequestStage::new( request_processor, - obj.accounting_stage.clone(), + obj.event_processor.clone(), exit.clone(), sig_verify_stage.output, packet_recycler.clone(), @@ -178,7 +178,7 @@ impl Tvu { ); let t_write = Self::drain_service( - obj.accounting_stage.clone(), + obj.event_processor.clone(), request_stage.request_processor.clone(), exit.clone(), ); @@ -230,12 +230,12 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke #[cfg(test)] mod tests { use accountant::Accountant; - use accounting_stage::AccountingStage; use bincode::serialize; use chrono::prelude::*; use crdt::Crdt; use entry; use event::Event; + use event_processor::EventProcessor; use hash::{hash, Hash}; use logger; use mint::Mint; @@ -302,8 +302,8 @@ mod tests { let starting_balance = 10_000; let alice = Mint::new(starting_balance); let accountant = Accountant::new(&alice); - let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); - let tvu = Arc::new(Tvu::new(accounting_stage)); + let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let tvu = Arc::new(Tvu::new(event_processor)); let replicate_addr = target1_data.replicate_addr; let threads = Tvu::serve( &tvu, @@ -328,7 +328,7 @@ mod tests { w.set_index(i).unwrap(); w.set_id(leader_id).unwrap(); - let accountant = &tvu.accounting_stage.accountant; + let accountant = &tvu.event_processor.accountant; let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); @@ -370,7 +370,7 @@ mod tests { msgs.push(msg); } - let accountant = &tvu.accounting_stage.accountant; + let accountant = &tvu.event_processor.accountant; let alice_balance = accountant.get_balance(&alice.keypair().pubkey()).unwrap(); assert_eq!(alice_balance, alice_ref_balance);