diff --git a/src/accountant.rs b/src/accountant.rs index 770494e6c2..cc9503f3e4 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -167,7 +167,7 @@ mod tests { drop(acc.historian.sender); assert_eq!( - acc.historian.thread_hdl.join().unwrap().1, + acc.historian.thread_hdl.join().unwrap(), ExitReason::RecvDisconnected ); } @@ -191,7 +191,7 @@ mod tests { drop(acc.historian.sender); assert_eq!( - acc.historian.thread_hdl.join().unwrap().1, + acc.historian.thread_hdl.join().unwrap(), ExitReason::RecvDisconnected ); } @@ -209,7 +209,7 @@ mod tests { drop(acc.historian.sender); assert_eq!( - acc.historian.thread_hdl.join().unwrap().1, + acc.historian.thread_hdl.join().unwrap(), ExitReason::RecvDisconnected ); } diff --git a/src/historian.rs b/src/historian.rs index 8e88f2acd8..7f87e0f943 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -14,7 +14,7 @@ use std::fmt::Debug; pub struct Historian { pub sender: SyncSender>, pub receiver: Receiver>, - pub thread_hdl: JoinHandle<(Entry, ExitReason)>, + pub thread_hdl: JoinHandle, pub signatures: HashSet, } @@ -40,12 +40,12 @@ impl Historian { ms_per_tick: Option, receiver: Receiver>, sender: SyncSender>, - ) -> JoinHandle<(Entry, ExitReason)> { + ) -> JoinHandle { spawn(move || { let mut logger = Logger::new(receiver, sender, start_hash); let now = Instant::now(); loop { - if let Err(err) = logger.log_events(now, ms_per_tick) { + if let Err(err) = logger.process_events(now, ms_per_tick) { return err; } logger.last_id = hash(&logger.last_id); @@ -90,7 +90,7 @@ mod tests { drop(hist.sender); assert_eq!( - hist.thread_hdl.join().unwrap().1, + hist.thread_hdl.join().unwrap(), ExitReason::RecvDisconnected ); @@ -104,7 +104,7 @@ mod tests { drop(hist.receiver); hist.sender.send(Event::Tick).unwrap(); assert_eq!( - hist.thread_hdl.join().unwrap().1, + hist.thread_hdl.join().unwrap(), ExitReason::SendDisconnected ); } @@ -127,15 +127,14 @@ mod tests { let hist = Historian::new(&zero, Some(20)); sleep(Duration::from_millis(30)); hist.sender.send(Event::Tick).unwrap(); - sleep(Duration::from_millis(15)); drop(hist.sender); - assert_eq!( - hist.thread_hdl.join().unwrap().1, - ExitReason::RecvDisconnected - ); - let entries: Vec> = hist.receiver.iter().collect(); - assert!(entries.len() > 1); - assert!(verify_slice(&entries, &zero)); + + // Ensure one entry is sent back for each tick sent in. + assert_eq!(entries.len(), 1); + + // Ensure the ID is not the seed, which indicates another Tick + // was logged before the one we sent. + assert_ne!(entries[0].id, zero); } } diff --git a/src/logger.rs b/src/logger.rs index 67c812419d..a4def4a8d5 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -41,19 +41,16 @@ impl Logger { } } - pub fn log_event(&mut self, event: Event) -> Result<(), (Entry, ExitReason)> { + pub fn log_event(&mut self, event: Event) -> Result, ExitReason> { let entry = create_entry_mut(&mut self.last_id, &mut self.num_hashes, event); - if let Err(_) = self.sender.send(entry.clone()) { - return Err((entry, ExitReason::SendDisconnected)); - } - Ok(()) + Ok(entry) } - pub fn log_events( + pub fn process_events( &mut self, epoch: Instant, ms_per_tick: Option, - ) -> Result<(), (Entry, ExitReason)> { + ) -> Result<(), ExitReason> { loop { if let Some(ms) = ms_per_tick { if epoch.elapsed() > Duration::from_millis((self.num_ticks + 1) * ms) { @@ -61,22 +58,17 @@ impl Logger { self.num_ticks += 1; } } + match self.receiver.try_recv() { Ok(event) => { - self.log_event(event)?; + let entry = self.log_event(event)?; + self.sender + .send(entry) + .or(Err(ExitReason::SendDisconnected))?; } - Err(TryRecvError::Empty) => { - return Ok(()); - } - Err(TryRecvError::Disconnected) => { - let entry = Entry { - id: self.last_id, - num_hashes: self.num_hashes, - event: Event::Tick, - }; - return Err((entry, ExitReason::RecvDisconnected)); - } - } + Err(TryRecvError::Empty) => return Ok(()), + Err(TryRecvError::Disconnected) => return Err(ExitReason::RecvDisconnected), + }; } } } @@ -98,12 +90,13 @@ mod tests { } fn run_genesis(gen: Genesis) -> Vec> { - let (_sender, event_receiver) = sync_channel(100); + let (sender, event_receiver) = sync_channel(100); let (entry_sender, receiver) = sync_channel(100); let mut logger = Logger::new(event_receiver, entry_sender, hash(&gen.pkcs8)); for tx in gen.create_events() { - logger.log_event(tx).unwrap(); + sender.send(tx).unwrap(); } + logger.process_events(Instant::now(), None).unwrap(); drop(logger.sender); receiver.iter().collect::>() }