Logger now only speaks when spoken to

Before this change, the logger's send channel could quickly be
flooded with Tick events. Those events should only be passed to
a writer.

Also, the log_event() function no longer sends entries. That
functionality moved to the new process_events() function. This
will allow us to initialize the with the genesis block without
flooding the send channel with events the historian won't read.
This commit is contained in:
Greg Fitzgerald 2018-03-05 10:30:05 -07:00
parent aa5f1699a7
commit c9e03f37ce
3 changed files with 30 additions and 38 deletions

View File

@ -167,7 +167,7 @@ mod tests {
drop(acc.historian.sender);
assert_eq!(
acc.historian.thread_hdl.join().unwrap().1,
acc.historian.thread_hdl.join().unwrap(),
ExitReason::RecvDisconnected
);
}
@ -191,7 +191,7 @@ mod tests {
drop(acc.historian.sender);
assert_eq!(
acc.historian.thread_hdl.join().unwrap().1,
acc.historian.thread_hdl.join().unwrap(),
ExitReason::RecvDisconnected
);
}
@ -209,7 +209,7 @@ mod tests {
drop(acc.historian.sender);
assert_eq!(
acc.historian.thread_hdl.join().unwrap().1,
acc.historian.thread_hdl.join().unwrap(),
ExitReason::RecvDisconnected
);
}

View File

@ -14,7 +14,7 @@ use std::fmt::Debug;
pub struct Historian<T> {
pub sender: SyncSender<Event<T>>,
pub receiver: Receiver<Entry<T>>,
pub thread_hdl: JoinHandle<(Entry<T>, ExitReason)>,
pub thread_hdl: JoinHandle<ExitReason>,
pub signatures: HashSet<Signature>,
}
@ -40,12 +40,12 @@ impl<T: 'static + Serialize + Clone + Debug + Send> Historian<T> {
ms_per_tick: Option<u64>,
receiver: Receiver<Event<T>>,
sender: SyncSender<Entry<T>>,
) -> JoinHandle<(Entry<T>, ExitReason)> {
) -> JoinHandle<ExitReason> {
spawn(move || {
let mut logger = Logger::new(receiver, sender, start_hash);
let now = Instant::now();
loop {
if let Err(err) = logger.log_events(now, ms_per_tick) {
if let Err(err) = logger.process_events(now, ms_per_tick) {
return err;
}
logger.last_id = hash(&logger.last_id);
@ -90,7 +90,7 @@ mod tests {
drop(hist.sender);
assert_eq!(
hist.thread_hdl.join().unwrap().1,
hist.thread_hdl.join().unwrap(),
ExitReason::RecvDisconnected
);
@ -104,7 +104,7 @@ mod tests {
drop(hist.receiver);
hist.sender.send(Event::Tick).unwrap();
assert_eq!(
hist.thread_hdl.join().unwrap().1,
hist.thread_hdl.join().unwrap(),
ExitReason::SendDisconnected
);
}
@ -127,15 +127,14 @@ mod tests {
let hist = Historian::new(&zero, Some(20));
sleep(Duration::from_millis(30));
hist.sender.send(Event::Tick).unwrap();
sleep(Duration::from_millis(15));
drop(hist.sender);
assert_eq!(
hist.thread_hdl.join().unwrap().1,
ExitReason::RecvDisconnected
);
let entries: Vec<Entry<Sha256Hash>> = hist.receiver.iter().collect();
assert!(entries.len() > 1);
assert!(verify_slice(&entries, &zero));
// Ensure one entry is sent back for each tick sent in.
assert_eq!(entries.len(), 1);
// Ensure the ID is not the seed, which indicates another Tick
// was logged before the one we sent.
assert_ne!(entries[0].id, zero);
}
}

View File

@ -41,19 +41,16 @@ impl<T: Serialize + Clone + Debug> Logger<T> {
}
}
pub fn log_event(&mut self, event: Event<T>) -> Result<(), (Entry<T>, ExitReason)> {
pub fn log_event(&mut self, event: Event<T>) -> Result<Entry<T>, ExitReason> {
let entry = create_entry_mut(&mut self.last_id, &mut self.num_hashes, event);
if let Err(_) = self.sender.send(entry.clone()) {
return Err((entry, ExitReason::SendDisconnected));
}
Ok(())
Ok(entry)
}
pub fn log_events(
pub fn process_events(
&mut self,
epoch: Instant,
ms_per_tick: Option<u64>,
) -> Result<(), (Entry<T>, ExitReason)> {
) -> Result<(), ExitReason> {
loop {
if let Some(ms) = ms_per_tick {
if epoch.elapsed() > Duration::from_millis((self.num_ticks + 1) * ms) {
@ -61,22 +58,17 @@ impl<T: Serialize + Clone + Debug> Logger<T> {
self.num_ticks += 1;
}
}
match self.receiver.try_recv() {
Ok(event) => {
self.log_event(event)?;
let entry = self.log_event(event)?;
self.sender
.send(entry)
.or(Err(ExitReason::SendDisconnected))?;
}
Err(TryRecvError::Empty) => {
return Ok(());
}
Err(TryRecvError::Disconnected) => {
let entry = Entry {
id: self.last_id,
num_hashes: self.num_hashes,
event: Event::Tick,
};
return Err((entry, ExitReason::RecvDisconnected));
}
}
Err(TryRecvError::Empty) => return Ok(()),
Err(TryRecvError::Disconnected) => return Err(ExitReason::RecvDisconnected),
};
}
}
}
@ -98,12 +90,13 @@ mod tests {
}
fn run_genesis(gen: Genesis) -> Vec<Entry<u64>> {
let (_sender, event_receiver) = sync_channel(100);
let (sender, event_receiver) = sync_channel(100);
let (entry_sender, receiver) = sync_channel(100);
let mut logger = Logger::new(event_receiver, entry_sender, hash(&gen.pkcs8));
for tx in gen.create_events() {
logger.log_event(tx).unwrap();
sender.send(tx).unwrap();
}
logger.process_events(Instant::now(), None).unwrap();
drop(logger.sender);
receiver.iter().collect::<Vec<_>>()
}