Purge EventProcessor from RPU

This commit is contained in:
Greg Fitzgerald 2018-05-14 14:35:25 -06:00
parent 17cc9ab07f
commit 685de30047
3 changed files with 17 additions and 30 deletions

View File

@ -10,7 +10,6 @@ use solana::accountant::Accountant;
use solana::crdt::ReplicatedData; use solana::crdt::ReplicatedData;
use solana::entry::Entry; use solana::entry::Entry;
use solana::event::Event; use solana::event::Event;
use solana::event_processor::EventProcessor;
use solana::rpu::Rpu; use solana::rpu::Rpu;
use solana::signature::{KeyPair, KeyPairUtil}; use solana::signature::{KeyPair, KeyPairUtil};
use std::env; use std::env;
@ -116,10 +115,8 @@ fn main() {
eprintln!("creating networking stack..."); eprintln!("creating networking stack...");
let event_processor =
EventProcessor::new(accountant, &last_id, Some(Duration::from_millis(1000)));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let rpu = Rpu::new(event_processor); let rpu = Rpu::new(accountant, last_id, Some(Duration::from_millis(1000)));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();

View File

@ -5,7 +5,7 @@ use accountant::Accountant;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use entry::Entry; use entry::Entry;
use entry_writer::EntryWriter; use entry_writer::EntryWriter;
use event_processor::EventProcessor; use hash::Hash;
use packet; use packet;
use record_stage::RecordStage; use record_stage::RecordStage;
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
@ -18,17 +18,22 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer; use streamer;
pub struct Rpu { pub struct Rpu {
event_processor: Arc<EventProcessor>, accountant: Arc<Accountant>,
start_hash: Hash,
tick_duration: Option<Duration>,
} }
impl Rpu { impl Rpu {
/// Create a new Rpu that wraps the given Accountant. /// Create a new Rpu that wraps the given Accountant.
pub fn new(event_processor: EventProcessor) -> Self { pub fn new(accountant: Accountant, start_hash: Hash, tick_duration: Option<Duration>) -> Self {
Rpu { Rpu {
event_processor: Arc::new(event_processor), accountant: Arc::new(accountant),
start_hash,
tick_duration,
} }
} }
@ -86,7 +91,7 @@ impl Rpu {
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
let blob_recycler = packet::BlobRecycler::default(); let blob_recycler = packet::BlobRecycler::default();
let request_processor = RequestProcessor::new(self.event_processor.accountant.clone()); let request_processor = RequestProcessor::new(self.accountant.clone());
let request_stage = RequestStage::new( let request_stage = RequestStage::new(
request_processor, request_processor,
exit.clone(), exit.clone(),
@ -97,13 +102,13 @@ impl Rpu {
let record_stage = RecordStage::new( let record_stage = RecordStage::new(
request_stage.signal_receiver, request_stage.signal_receiver,
&self.event_processor.start_hash, &self.start_hash,
self.event_processor.tick_duration, self.tick_duration,
); );
let (broadcast_sender, broadcast_receiver) = channel(); let (broadcast_sender, broadcast_receiver) = channel();
let t_write = Self::write_service( let t_write = Self::write_service(
self.event_processor.accountant.clone(), self.accountant.clone(),
exit.clone(), exit.clone(),
broadcast_sender, broadcast_sender,
blob_recycler.clone(), blob_recycler.clone(),

View File

@ -191,12 +191,7 @@ mod tests {
let accountant = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let event_processor = EventProcessor::new( let rpu = Rpu::new(accountant, alice.last_id(), Some(Duration::from_millis(30)));
accountant,
&alice.last_id(),
Some(Duration::from_millis(30)),
);
let rpu = Rpu::new(event_processor);
let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
@ -234,12 +229,7 @@ mod tests {
let accountant = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let event_processor = EventProcessor::new( let rpu = Rpu::new(accountant, alice.last_id(), Some(Duration::from_millis(30)));
accountant,
&alice.last_id(),
Some(Duration::from_millis(30)),
);
let rpu = Rpu::new(event_processor);
let serve_addr = leader_serve.local_addr().unwrap(); let serve_addr = leader_serve.local_addr().unwrap();
let threads = rpu.serve( let threads = rpu.serve(
leader_data, leader_data,
@ -308,12 +298,7 @@ mod tests {
let leader_acc = { let leader_acc = {
let accountant = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let event_processor = EventProcessor::new( Rpu::new(accountant, alice.last_id(), Some(Duration::from_millis(30)))
accountant,
&alice.last_id(),
Some(Duration::from_millis(30)),
);
Rpu::new(event_processor)
}; };
let replicant_acc = { let replicant_acc = {