Add historian to pipeline

No longer intercept entries to register_entry_id(). Intead,
register the ID in the Write stage.

EventProcessor is now just being used as a place to store data.

Fixes #216
This commit is contained in:
Greg Fitzgerald 2018-05-14 12:43:38 -06:00
parent a578c1a5e3
commit a2c05b112e
6 changed files with 38 additions and 30 deletions

View File

@ -14,6 +14,8 @@ pub struct EventProcessor {
pub accountant: Arc<Accountant>, pub accountant: Arc<Accountant>,
historian_input: Mutex<Sender<Signal>>, historian_input: Mutex<Sender<Signal>>,
historian: Mutex<Historian>, historian: Mutex<Historian>,
pub start_hash: Hash,
pub ms_per_tick: Option<u64>,
} }
impl EventProcessor { impl EventProcessor {
@ -25,6 +27,8 @@ impl EventProcessor {
accountant: Arc::new(accountant), accountant: Arc::new(accountant),
historian_input: Mutex::new(historian_input), historian_input: Mutex::new(historian_input),
historian: Mutex::new(historian), historian: Mutex::new(historian),
start_hash: *start_hash,
ms_per_tick,
} }
} }
@ -37,7 +41,7 @@ impl EventProcessor {
sender.send(Signal::Events(events))?; sender.send(Signal::Events(events))?;
// Wait for the historian to tag our Events with an ID and then register it. // Wait for the historian to tag our Events with an ID and then register it.
let entry = historian.entry_receiver.lock().unwrap().recv()?; let entry = historian.entry_receiver.recv()?;
self.accountant.register_entry_id(&entry.id); self.accountant.register_entry_id(&entry.id);
Ok(entry) Ok(entry)
} }

View File

@ -4,13 +4,12 @@
use entry::Entry; use entry::Entry;
use hash::Hash; use hash::Hash;
use recorder::{ExitReason, Recorder, Signal}; use recorder::{ExitReason, Recorder, Signal};
use std::sync::Mutex;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Instant; use std::time::Instant;
pub struct Historian { pub struct Historian {
pub entry_receiver: Mutex<Receiver<Entry>>, pub entry_receiver: Receiver<Entry>,
pub thread_hdl: JoinHandle<ExitReason>, pub thread_hdl: JoinHandle<ExitReason>,
} }
@ -24,7 +23,7 @@ impl Historian {
let thread_hdl = let thread_hdl =
Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender);
Historian { Historian {
entry_receiver: Mutex::new(entry_receiver), entry_receiver,
thread_hdl, thread_hdl,
} }
} }
@ -52,10 +51,7 @@ impl Historian {
} }
pub fn receive(self: &Self) -> Result<Entry, TryRecvError> { pub fn receive(self: &Self) -> Result<Entry, TryRecvError> {
self.entry_receiver self.entry_receiver.try_recv()
.lock()
.expect("'entry_receiver' lock in pub fn receive")
.try_recv()
} }
} }
@ -78,9 +74,9 @@ mod tests {
sleep(Duration::new(0, 1_000_000)); sleep(Duration::new(0, 1_000_000));
input.send(Signal::Tick).unwrap(); input.send(Signal::Tick).unwrap();
let entry0 = hist.entry_receiver.lock().unwrap().recv().unwrap(); let entry0 = hist.entry_receiver.recv().unwrap();
let entry1 = hist.entry_receiver.lock().unwrap().recv().unwrap(); let entry1 = hist.entry_receiver.recv().unwrap();
let entry2 = hist.entry_receiver.lock().unwrap().recv().unwrap(); let entry2 = hist.entry_receiver.recv().unwrap();
assert_eq!(entry0.num_hashes, 0); assert_eq!(entry0.num_hashes, 0);
assert_eq!(entry1.num_hashes, 0); assert_eq!(entry1.num_hashes, 0);
@ -117,7 +113,7 @@ mod tests {
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
input.send(Signal::Tick).unwrap(); input.send(Signal::Tick).unwrap();
drop(input); drop(input);
let entries: Vec<Entry> = hist.entry_receiver.lock().unwrap().iter().collect(); let entries: Vec<Entry> = hist.entry_receiver.iter().collect();
assert!(entries.len() > 1); assert!(entries.len() > 1);
// Ensure the ID is not the seed. // Ensure the ID is not the seed.

View File

@ -2,12 +2,11 @@
use accountant::Accountant; use accountant::Accountant;
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize};
use entry::Entry;
use event::Event; use event::Event;
use event_processor::EventProcessor;
use packet; use packet;
use packet::SharedPackets; use packet::SharedPackets;
use rayon::prelude::*; use rayon::prelude::*;
use recorder::Signal;
use request::{Request, Response}; use request::{Request, Response};
use result::Result; use result::Result;
use std::collections::VecDeque; use std::collections::VecDeque;
@ -140,9 +139,8 @@ impl RequestProcessor {
pub fn process_request_packets( pub fn process_request_packets(
&self, &self,
event_processor: &EventProcessor,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>, verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
entry_sender: &Sender<Entry>, signal_sender: &Sender<Signal>,
blob_sender: &streamer::BlobSender, blob_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler, packet_recycler: &packet::PacketRecycler,
blob_recycler: &packet::BlobRecycler, blob_recycler: &packet::BlobRecycler,
@ -176,8 +174,9 @@ impl RequestProcessor {
debug!("events: {} reqs: {}", events.len(), reqs.len()); debug!("events: {} reqs: {}", events.len(), reqs.len());
debug!("process_events"); debug!("process_events");
let entry = event_processor.process_events(events)?; let results = self.accountant.process_verified_events(events);
entry_sender.send(entry)?; let events = results.into_iter().filter_map(|x| x.ok()).collect();
signal_sender.send(Signal::Events(events))?;
debug!("done process_events"); debug!("done process_events");
debug!("process_requests"); debug!("process_requests");

View File

@ -1,9 +1,8 @@
//! The `request_stage` processes thin client Request messages. //! The `request_stage` processes thin client Request messages.
use entry::Entry;
use event_processor::EventProcessor;
use packet; use packet;
use packet::SharedPackets; use packet::SharedPackets;
use recorder::Signal;
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -13,7 +12,7 @@ use streamer;
pub struct RequestStage { pub struct RequestStage {
pub thread_hdl: JoinHandle<()>, pub thread_hdl: JoinHandle<()>,
pub entry_receiver: Receiver<Entry>, pub signal_receiver: Receiver<Signal>,
pub blob_receiver: streamer::BlobReceiver, pub blob_receiver: streamer::BlobReceiver,
pub request_processor: Arc<RequestProcessor>, pub request_processor: Arc<RequestProcessor>,
} }
@ -21,7 +20,6 @@ pub struct RequestStage {
impl RequestStage { impl RequestStage {
pub fn new( pub fn new(
request_processor: RequestProcessor, request_processor: RequestProcessor,
event_processor: Arc<EventProcessor>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>, verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
packet_recycler: packet::PacketRecycler, packet_recycler: packet::PacketRecycler,
@ -29,13 +27,12 @@ impl RequestStage {
) -> Self { ) -> Self {
let request_processor = Arc::new(request_processor); let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone(); let request_processor_ = request_processor.clone();
let (entry_sender, entry_receiver) = channel(); let (signal_sender, signal_receiver) = channel();
let (blob_sender, blob_receiver) = channel(); let (blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || loop { let thread_hdl = spawn(move || loop {
let e = request_processor_.process_request_packets( let e = request_processor_.process_request_packets(
&event_processor,
&verified_receiver, &verified_receiver,
&entry_sender, &signal_sender,
&blob_sender, &blob_sender,
&packet_recycler, &packet_recycler,
&blob_recycler, &blob_recycler,
@ -48,7 +45,7 @@ impl RequestStage {
}); });
RequestStage { RequestStage {
thread_hdl, thread_hdl,
entry_receiver, signal_receiver,
blob_receiver, blob_receiver,
request_processor, request_processor,
} }

View File

@ -6,6 +6,7 @@ use crdt::{Crdt, ReplicatedData};
use entry::Entry; use entry::Entry;
use entry_writer::EntryWriter; use entry_writer::EntryWriter;
use event_processor::EventProcessor; use event_processor::EventProcessor;
use historian::Historian;
use packet; use packet;
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
use request_stage::RequestStage; use request_stage::RequestStage;
@ -88,13 +89,18 @@ impl Rpu {
let request_processor = RequestProcessor::new(self.event_processor.accountant.clone()); let request_processor = RequestProcessor::new(self.event_processor.accountant.clone());
let request_stage = RequestStage::new( let request_stage = RequestStage::new(
request_processor, request_processor,
self.event_processor.clone(),
exit.clone(), exit.clone(),
sig_verify_stage.verified_receiver, sig_verify_stage.verified_receiver,
packet_recycler.clone(), packet_recycler.clone(),
blob_recycler.clone(), blob_recycler.clone(),
); );
let historian_stage = Historian::new(
request_stage.signal_receiver,
&self.event_processor.start_hash,
self.event_processor.ms_per_tick,
);
let (broadcast_sender, broadcast_receiver) = channel(); let (broadcast_sender, broadcast_receiver) = channel();
let t_write = Self::write_service( let t_write = Self::write_service(
self.event_processor.accountant.clone(), self.event_processor.accountant.clone(),
@ -102,7 +108,7 @@ impl Rpu {
broadcast_sender, broadcast_sender,
blob_recycler.clone(), blob_recycler.clone(),
Mutex::new(writer), Mutex::new(writer),
request_stage.entry_receiver, historian_stage.entry_receiver,
); );
let broadcast_socket = UdpSocket::bind(local)?; let broadcast_socket = UdpSocket::bind(local)?;

View File

@ -6,6 +6,7 @@ use crdt::{Crdt, ReplicatedData};
use entry::Entry; use entry::Entry;
use entry_writer::EntryWriter; use entry_writer::EntryWriter;
use event_processor::EventProcessor; use event_processor::EventProcessor;
use historian::Historian;
use ledger; use ledger;
use packet; use packet;
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
@ -173,17 +174,22 @@ impl Tvu {
let request_processor = RequestProcessor::new(obj.event_processor.accountant.clone()); let request_processor = RequestProcessor::new(obj.event_processor.accountant.clone());
let request_stage = RequestStage::new( let request_stage = RequestStage::new(
request_processor, request_processor,
obj.event_processor.clone(),
exit.clone(), exit.clone(),
sig_verify_stage.verified_receiver, sig_verify_stage.verified_receiver,
packet_recycler.clone(), packet_recycler.clone(),
blob_recycler.clone(), blob_recycler.clone(),
); );
let historian_stage = Historian::new(
request_stage.signal_receiver,
&obj.event_processor.start_hash,
obj.event_processor.ms_per_tick,
);
let t_write = Self::drain_service( let t_write = Self::drain_service(
obj.event_processor.accountant.clone(), obj.event_processor.accountant.clone(),
exit.clone(), exit.clone(),
request_stage.entry_receiver, historian_stage.entry_receiver,
); );
let t_responder = streamer::responder( let t_responder = streamer::responder(