diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index b298ca4157..ac8b74d10a 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -10,7 +10,6 @@ use solana::accountant::Accountant; use solana::crdt::ReplicatedData; use solana::entry::Entry; use solana::event::Event; -use solana::event_processor::EventProcessor; use solana::rpu::Rpu; use solana::signature::{KeyPair, KeyPairUtil}; use std::env; @@ -116,10 +115,8 @@ fn main() { eprintln!("creating networking stack..."); - let event_processor = - EventProcessor::new(accountant, &last_id, Some(Duration::from_millis(1000))); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(event_processor); + let rpu = Rpu::new(accountant, last_id, Some(Duration::from_millis(1000))); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); diff --git a/src/rpu.rs b/src/rpu.rs index 8019f7d0e4..6bc7aa5226 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -5,7 +5,7 @@ use accountant::Accountant; use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; -use event_processor::EventProcessor; +use hash::Hash; use packet; use record_stage::RecordStage; use request_processor::RequestProcessor; @@ -18,17 +18,22 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; +use std::time::Duration; use streamer; pub struct Rpu { - event_processor: Arc, + accountant: Arc, + start_hash: Hash, + tick_duration: Option, } impl Rpu { /// Create a new Rpu that wraps the given Accountant. - pub fn new(event_processor: EventProcessor) -> Self { + pub fn new(accountant: Accountant, start_hash: Hash, tick_duration: Option) -> Self { Rpu { - event_processor: Arc::new(event_processor), + accountant: Arc::new(accountant), + start_hash, + tick_duration, } } @@ -86,7 +91,7 @@ impl Rpu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let blob_recycler = packet::BlobRecycler::default(); - let request_processor = RequestProcessor::new(self.event_processor.accountant.clone()); + let request_processor = RequestProcessor::new(self.accountant.clone()); let request_stage = RequestStage::new( request_processor, exit.clone(), @@ -97,13 +102,13 @@ impl Rpu { let record_stage = RecordStage::new( request_stage.signal_receiver, - &self.event_processor.start_hash, - self.event_processor.tick_duration, + &self.start_hash, + self.tick_duration, ); let (broadcast_sender, broadcast_receiver) = channel(); let t_write = Self::write_service( - self.event_processor.accountant.clone(), + self.accountant.clone(), exit.clone(), broadcast_sender, blob_recycler.clone(), diff --git a/src/thin_client.rs b/src/thin_client.rs index 87cc37b856..a682dfb22e 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -191,12 +191,7 @@ mod tests { let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let event_processor = EventProcessor::new( - accountant, - &alice.last_id(), - Some(Duration::from_millis(30)), - ); - let rpu = Rpu::new(event_processor); + let rpu = Rpu::new(accountant, alice.last_id(), Some(Duration::from_millis(30))); let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(900)); @@ -234,12 +229,7 @@ mod tests { let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let event_processor = EventProcessor::new( - accountant, - &alice.last_id(), - Some(Duration::from_millis(30)), - ); - let rpu = Rpu::new(event_processor); + let rpu = Rpu::new(accountant, alice.last_id(), Some(Duration::from_millis(30))); let serve_addr = leader_serve.local_addr().unwrap(); let threads = rpu.serve( leader_data, @@ -308,12 +298,7 @@ mod tests { let leader_acc = { let accountant = Accountant::new(&alice); - let event_processor = EventProcessor::new( - accountant, - &alice.last_id(), - Some(Duration::from_millis(30)), - ); - Rpu::new(event_processor) + Rpu::new(accountant, alice.last_id(), Some(Duration::from_millis(30))) }; let replicant_acc = {