diff --git a/src/accountant.rs b/src/accountant.rs index 69966c49d..ebaec39c4 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -152,7 +152,7 @@ impl Accountant { mod tests { use super::*; use event::{generate_keypair, get_pubkey}; - use historian::ExitReason; + use logger::ExitReason; #[test] fn test_accountant() { diff --git a/src/historian.rs b/src/historian.rs index 515a5746e..446570642 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -1,16 +1,13 @@ //! The `historian` crate provides a microservice for generating a Proof-of-History. -//! It logs Event items on behalf of its users. It continuously generates -//! new hashes, only stopping to check if it has been sent an Event item. It -//! tags each Event with an Entry and sends it back. The Entry includes the -//! Event, the latest hash, and the number of hashes since the last event. -//! The resulting stream of entries represents ordered events in time. +//! It manages a thread containing a Proof-of-History Logger. use std::thread::JoinHandle; use std::collections::HashSet; use std::sync::mpsc::{Receiver, SyncSender}; -use std::time::{Duration, SystemTime}; -use log::{hash, hash_event, Entry, Sha256Hash}; -use event::{get_signature, verify_event, Event, Signature}; +use std::time::Instant; +use log::{hash, Entry, Sha256Hash}; +use logger::{verify_event_and_reserve_signature, ExitReason, Logger}; +use event::{Event, Signature}; use serde::Serialize; use std::fmt::Debug; @@ -21,121 +18,13 @@ pub struct Historian { pub signatures: HashSet, } -#[derive(Debug, PartialEq, Eq)] -pub enum ExitReason { - RecvDisconnected, - SendDisconnected, -} -fn log_event( - sender: &SyncSender>, - num_hashes: &mut u64, - end_hash: &mut Sha256Hash, - event: Event, -) -> Result<(), (Entry, ExitReason)> { - *end_hash = hash_event(end_hash, &event); - let entry = Entry { - end_hash: *end_hash, - num_hashes: *num_hashes, - event, - }; - if let Err(_) = sender.send(entry.clone()) { - return Err((entry, ExitReason::SendDisconnected)); - } - *num_hashes = 0; - Ok(()) -} - -fn verify_event_and_reserve_signature( - signatures: &mut HashSet, - event: &Event, -) -> bool { - if !verify_event(&event) { - return false; - } - if let Some(sig) = get_signature(&event) { - if signatures.contains(&sig) { - return false; - } - signatures.insert(sig); - } - true -} - -fn log_events( - receiver: &Receiver>, - sender: &SyncSender>, - num_hashes: &mut u64, - end_hash: &mut Sha256Hash, - epoch: SystemTime, - num_ticks: &mut u64, - ms_per_tick: Option, -) -> Result<(), (Entry, ExitReason)> { - use std::sync::mpsc::TryRecvError; - loop { - if let Some(ms) = ms_per_tick { - let now = SystemTime::now(); - if now > epoch + Duration::from_millis((*num_ticks + 1) * ms) { - log_event(sender, num_hashes, end_hash, Event::Tick)?; - *num_ticks += 1; - } - } - match receiver.try_recv() { - Ok(event) => { - log_event(sender, num_hashes, end_hash, event)?; - } - Err(TryRecvError::Empty) => { - return Ok(()); - } - Err(TryRecvError::Disconnected) => { - let entry = Entry { - end_hash: *end_hash, - num_hashes: *num_hashes, - event: Event::Tick, - }; - return Err((entry, ExitReason::RecvDisconnected)); - } - } - } -} - -/// A background thread that will continue tagging received Event messages and -/// sending back Entry messages until either the receiver or sender channel is closed. -pub fn create_logger( - start_hash: Sha256Hash, - ms_per_tick: Option, - receiver: Receiver>, - sender: SyncSender>, -) -> JoinHandle<(Entry, ExitReason)> { - use std::thread; - thread::spawn(move || { - let mut end_hash = start_hash; - let mut num_hashes = 0; - let mut num_ticks = 0; - let epoch = SystemTime::now(); - loop { - if let Err(err) = log_events( - &receiver, - &sender, - &mut num_hashes, - &mut end_hash, - epoch, - &mut num_ticks, - ms_per_tick, - ) { - return err; - } - end_hash = hash(&end_hash); - num_hashes += 1; - } - }) -} - impl Historian { pub fn new(start_hash: &Sha256Hash, ms_per_tick: Option) -> Self { use std::sync::mpsc::sync_channel; let (sender, event_receiver) = sync_channel(1000); let (entry_sender, receiver) = sync_channel(1000); - let thread_hdl = create_logger(*start_hash, ms_per_tick, event_receiver, entry_sender); + let thread_hdl = + Historian::create_logger(*start_hash, ms_per_tick, event_receiver, entry_sender); let signatures = HashSet::new(); Historian { sender, @@ -144,9 +33,32 @@ impl Historian { signatures, } } + pub fn verify_event(self: &mut Self, event: &Event) -> bool { return verify_event_and_reserve_signature(&mut self.signatures, event); } + + /// A background thread that will continue tagging received Event messages and + /// sending back Entry messages until either the receiver or sender channel is closed. + fn create_logger( + start_hash: Sha256Hash, + ms_per_tick: Option, + receiver: Receiver>, + sender: SyncSender>, + ) -> JoinHandle<(Entry, ExitReason)> { + use std::thread; + thread::spawn(move || { + let mut logger = Logger::new(receiver, sender, start_hash); + let now = Instant::now(); + loop { + if let Err(err) = logger.log_events(now, ms_per_tick) { + return err; + } + logger.end_hash = hash(&logger.end_hash); + logger.num_hashes += 1; + } + }) + } } #[cfg(test)] @@ -210,26 +122,4 @@ mod tests { assert!(entries.len() > 1); assert!(verify_slice(&entries, &zero)); } - - #[test] - fn test_bad_event_signature() { - let keypair = generate_keypair(); - let sig = sign_claim_data(&hash(b"hello, world"), &keypair); - let event0 = Event::new_claim(get_pubkey(&keypair), hash(b"goodbye cruel world"), sig); - let mut sigs = HashSet::new(); - assert!(!verify_event_and_reserve_signature(&mut sigs, &event0)); - assert!(!sigs.contains(&sig)); - } - - #[test] - fn test_duplicate_event_signature() { - let keypair = generate_keypair(); - let to = get_pubkey(&keypair); - let data = &hash(b"hello, world"); - let sig = sign_claim_data(data, &keypair); - let event0 = Event::new_claim(to, data, sig); - let mut sigs = HashSet::new(); - assert!(verify_event_and_reserve_signature(&mut sigs, &event0)); - assert!(!verify_event_and_reserve_signature(&mut sigs, &event0)); - } } diff --git a/src/lib.rs b/src/lib.rs index 851262000..588c79849 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![cfg_attr(feature = "unstable", feature(test))] pub mod log; +pub mod logger; pub mod event; pub mod historian; pub mod accountant; diff --git a/src/logger.rs b/src/logger.rs new file mode 100644 index 000000000..ce8040711 --- /dev/null +++ b/src/logger.rs @@ -0,0 +1,135 @@ +//! The `logger` crate provides an object for generating a Proof-of-History. +//! It logs Event items on behalf of its users. It continuously generates +//! new hashes, only stopping to check if it has been sent an Event item. It +//! tags each Event with an Entry and sends it back. The Entry includes the +//! Event, the latest hash, and the number of hashes since the last event. +//! The resulting stream of entries represents ordered events in time. + +use std::collections::HashSet; +use std::sync::mpsc::{Receiver, SyncSender}; +use std::time::{Duration, Instant}; +use log::{hash_event, Entry, Sha256Hash}; +use event::{get_signature, verify_event, Event, Signature}; +use serde::Serialize; +use std::fmt::Debug; + +#[derive(Debug, PartialEq, Eq)] +pub enum ExitReason { + RecvDisconnected, + SendDisconnected, +} + +pub struct Logger { + pub sender: SyncSender>, + pub receiver: Receiver>, + pub end_hash: Sha256Hash, + pub num_hashes: u64, + pub num_ticks: u64, +} + +pub fn verify_event_and_reserve_signature( + signatures: &mut HashSet, + event: &Event, +) -> bool { + if !verify_event(&event) { + return false; + } + if let Some(sig) = get_signature(&event) { + if signatures.contains(&sig) { + return false; + } + signatures.insert(sig); + } + true +} + +impl Logger { + pub fn new( + receiver: Receiver>, + sender: SyncSender>, + start_hash: Sha256Hash, + ) -> Self { + Logger { + receiver, + sender, + end_hash: start_hash, + num_hashes: 0, + num_ticks: 0, + } + } + + pub fn log_event(&mut self, event: Event) -> Result<(), (Entry, ExitReason)> { + self.end_hash = hash_event(&self.end_hash, &event); + let entry = Entry { + end_hash: self.end_hash, + num_hashes: self.num_hashes, + event, + }; + if let Err(_) = self.sender.send(entry.clone()) { + return Err((entry, ExitReason::SendDisconnected)); + } + self.num_hashes = 0; + Ok(()) + } + + pub fn log_events( + &mut self, + epoch: Instant, + ms_per_tick: Option, + ) -> Result<(), (Entry, ExitReason)> { + use std::sync::mpsc::TryRecvError; + loop { + if let Some(ms) = ms_per_tick { + if epoch.elapsed() > Duration::from_millis((self.num_ticks + 1) * ms) { + self.log_event(Event::Tick)?; + self.num_ticks += 1; + } + } + match self.receiver.try_recv() { + Ok(event) => { + self.log_event(event)?; + } + Err(TryRecvError::Empty) => { + return Ok(()); + } + Err(TryRecvError::Disconnected) => { + let entry = Entry { + end_hash: self.end_hash, + num_hashes: self.num_hashes, + event: Event::Tick, + }; + return Err((entry, ExitReason::RecvDisconnected)); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use log::*; + use event::*; + + #[test] + fn test_bad_event_signature() { + let keypair = generate_keypair(); + let sig = sign_claim_data(&hash(b"hello, world"), &keypair); + let event0 = Event::new_claim(get_pubkey(&keypair), hash(b"goodbye cruel world"), sig); + let mut sigs = HashSet::new(); + assert!(!verify_event_and_reserve_signature(&mut sigs, &event0)); + assert!(!sigs.contains(&sig)); + } + + #[test] + fn test_duplicate_event_signature() { + let keypair = generate_keypair(); + let to = get_pubkey(&keypair); + let data = &hash(b"hello, world"); + let sig = sign_claim_data(data, &keypair); + let event0 = Event::new_claim(to, data, sig); + let mut sigs = HashSet::new(); + assert!(verify_event_and_reserve_signature(&mut sigs, &event0)); + assert!(!verify_event_and_reserve_signature(&mut sigs, &event0)); + } +}