From 62a18d4c02f95fbfc850f586c0dbe32de643ac62 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Thu, 20 Sep 2018 15:02:24 -0700 Subject: [PATCH] step one of lastidnotfound: record_stage->record_service, trim recorder to hashes (#1281) step one of lastidnotfound * record_stage->record_service, trim recorder to hashes * doc updates, hash multiple without alloc() cc #1171 --- src/hash.rs | 15 +++-- src/lib.rs | 2 + src/poh.rs | 107 ++++++++++++++++++++++++++++++ src/poh_service.rs | 158 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 278 insertions(+), 4 deletions(-) create mode 100644 src/poh.rs create mode 100644 src/poh_service.rs diff --git a/src/hash.rs b/src/hash.rs index bdf21889c9..d78f69a96d 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -26,16 +26,23 @@ impl fmt::Display for Hash { write!(f, "{}", bs58::encode(self.0).into_string()) } } -/// Return a Sha256 hash for the given data. -pub fn hash(val: &[u8]) -> Hash { - let mut hasher = Sha256::default(); - hasher.input(val); +/// Return a Sha256 hash for the given data. +pub fn hashv(vals: &[&[u8]]) -> Hash { + let mut hasher = Sha256::default(); + for val in vals { + hasher.input(val); + } // At the time of this writing, the sha2 library is stuck on an old version // of generic_array (0.9.0). Decouple ourselves with a clone to our version. Hash(GenericArray::clone_from_slice(hasher.result().as_slice())) } +/// Return a Sha256 hash for the given data. +pub fn hash(val: &[u8]) -> Hash { + hashv(&[val]) +} + /// Return the hash of the given hash extended with the given value. pub fn extend_and_hash(id: &Hash, val: &[u8]) -> Hash { let mut hash_data = id.as_ref().to_vec(); diff --git a/src/lib.rs b/src/lib.rs index c9eecb8959..defb65975f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,6 +36,8 @@ pub mod ncp; pub mod netutil; pub mod packet; pub mod payment_plan; +pub mod poh; +pub mod poh_service; pub mod record_stage; pub mod recorder; pub mod recvmmsg; diff --git a/src/poh.rs b/src/poh.rs new file mode 100644 index 0000000000..6b96926359 --- /dev/null +++ b/src/poh.rs @@ -0,0 +1,107 @@ +//! The `Poh` module provides an object for generating a Proof of History. +//! It records Hashes items on behalf of its users. + +use hash::{hash, hashv, Hash}; +use std::time::{Duration, Instant}; + +pub struct Poh { + last_hash: Hash, + num_hashes: u64, + last_tick: Instant, + tick_duration: Option, +} + +#[derive(Debug)] +pub struct PohEntry { + pub num_hashes: u64, + pub id: Hash, + pub mixin: Option, +} + +impl Poh { + pub fn new(last_hash: Hash, tick_duration: Option) -> Self { + let last_tick = Instant::now(); + Poh { + last_hash, + num_hashes: 0, + last_tick, + tick_duration, + } + } + + pub fn hash(&mut self) { + self.last_hash = hash(&self.last_hash.as_ref()); + self.num_hashes += 1; + } + + pub fn record(&mut self, mixin: Hash) -> PohEntry { + let num_hashes = self.num_hashes + 1; + self.num_hashes = 0; + + self.last_hash = hashv(&[&self.last_hash.as_ref(), &mixin.as_ref()]); + + PohEntry { + num_hashes, + id: self.last_hash, + mixin: Some(mixin), + } + } + + // emissions of Ticks (i.e. PohEntries without a mixin) allows + // validators to parallelize the work of catching up + pub fn tick(&mut self) -> Option { + if let Some(tick_duration) = self.tick_duration { + if self.last_tick.elapsed() >= tick_duration { + self.last_tick = Instant::now(); + let entry = PohEntry { + num_hashes: self.num_hashes, + id: self.last_hash, + mixin: None, + }; + self.num_hashes = 0; + return Some(entry); + } + } + None + } +} + +pub fn verify(initial: Hash, entries: &[PohEntry]) -> bool { + let mut last_hash = initial; + + for entry in entries { + assert!(entry.num_hashes != 0); + for _ in 1..entry.num_hashes { + last_hash = hash(&last_hash.as_ref()); + } + let id = match entry.mixin { + Some(mixin) => hashv(&[&last_hash.as_ref(), &mixin.as_ref()]), + None => hash(&last_hash.as_ref()), + }; + if id != entry.id { + return false; + } + last_hash = id; + } + + return true; +} + +#[cfg(test)] +mod tests { + use hash::Hash; + use poh::{self, PohEntry}; + + #[test] + #[should_panic] + fn test_poh_verify_assert() { + poh::verify( + Hash::default(), + &[PohEntry { + num_hashes: 0, + id: Hash::default(), + mixin: None, + }], + ); + } +} diff --git a/src/poh_service.rs b/src/poh_service.rs new file mode 100644 index 0000000000..26bc004132 --- /dev/null +++ b/src/poh_service.rs @@ -0,0 +1,158 @@ +//! The `poh_service` module provides an object for generating a Proof of History. +//! It records Hashes items on behalf of its users. It continuously generates +//! new Hashes, only stopping to check if it has been sent a Hash to mix in +//! to the Poh. +//! +//! The returned Entry includes the mix-in request, the latest Poh Hash, and the +//! number of Hashes generated in the service since the last mix-in request. +//! +//! The resulting stream of Hashes represents ordered events in time. +//! +use hash::Hash; +use poh::{Poh, PohEntry}; +use service::Service; +use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError}; +use std::thread::{self, Builder, JoinHandle}; +use std::time::Duration; + +pub struct PohService { + thread_hdl: JoinHandle<()>, +} + +impl PohService { + /// A background thread that will continue tagging received Transaction messages and + /// sending back Entry messages until either the receiver or sender channel is closed. + pub fn new(start_hash: Hash, hash_receiver: Receiver) -> (Self, Receiver) { + let (poh_sender, poh_receiver) = channel(); + let thread_hdl = Builder::new() + .name("solana-record-service".to_string()) + .spawn(move || { + let mut poh = Poh::new(start_hash, None); + let _ = Self::process_hashes(&mut poh, &hash_receiver, &poh_sender); + }).unwrap(); + + (PohService { thread_hdl }, poh_receiver) + } + + /// Same as `PohService::new`, but will automatically produce entries every `tick_duration`. + pub fn new_with_clock( + start_hash: Hash, + hash_receiver: Receiver, + tick_duration: Duration, + ) -> (Self, Receiver) { + let (poh_sender, poh_receiver) = channel(); + let thread_hdl = Builder::new() + .name("solana-record-service".to_string()) + .spawn(move || { + let mut poh = Poh::new(start_hash, Some(tick_duration)); + loop { + if Self::try_process_hashes(&mut poh, &hash_receiver, &poh_sender).is_err() { + return; + } + poh.hash(); + } + }).unwrap(); + + (PohService { thread_hdl }, poh_receiver) + } + + fn process_hash(hash: Hash, poh: &mut Poh, sender: &Sender) -> Result<(), ()> { + let resp = poh.record(hash); + sender.send(resp).or(Err(()))?; + Ok(()) + } + + fn process_hashes( + poh: &mut Poh, + receiver: &Receiver, + sender: &Sender, + ) -> Result<(), ()> { + loop { + match receiver.recv() { + Ok(hash) => Self::process_hash(hash, poh, sender)?, + Err(RecvError) => return Err(()), + } + } + } + + fn try_process_hashes( + poh: &mut Poh, + receiver: &Receiver, + sender: &Sender, + ) -> Result<(), ()> { + loop { + if let Some(resp) = poh.tick() { + sender.send(resp).or(Err(()))?; + } + match receiver.try_recv() { + Ok(hash) => Self::process_hash(hash, poh, sender)?, + Err(TryRecvError::Empty) => return Ok(()), + Err(TryRecvError::Disconnected) => return Err(()), + }; + } + } +} + +impl Service for PohService { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use poh::verify; + use std::sync::mpsc::channel; + use std::thread::sleep; + + #[test] + fn test_poh() { + let (hash_sender, hash_receiver) = channel(); + let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver); + + hash_sender.send(Hash::default()).unwrap(); + sleep(Duration::from_millis(1)); + hash_sender.send(Hash::default()).unwrap(); + sleep(Duration::from_millis(1)); + hash_sender.send(Hash::default()).unwrap(); + + let entry0 = poh_receiver.recv().unwrap(); + let entry1 = poh_receiver.recv().unwrap(); + let entry2 = poh_receiver.recv().unwrap(); + + assert_eq!(entry0.num_hashes, 1); + assert_eq!(entry0.num_hashes, 1); + assert_eq!(entry0.num_hashes, 1); + + drop(hash_sender); + assert_eq!(poh_service.thread_hdl.join().unwrap(), ()); + + assert!(verify(Hash::default(), &[entry0, entry1, entry2])); + } + + #[test] + fn test_poh_closed_sender() { + let (hash_sender, hash_receiver) = channel(); + let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver); + drop(poh_receiver); + hash_sender.send(Hash::default()).unwrap(); + assert_eq!(poh_service.thread_hdl.join().unwrap(), ()); + } + + #[test] + fn test_poh_clock() { + let (hash_sender, hash_receiver) = channel(); + let (_poh_service, poh_receiver) = + PohService::new_with_clock(Hash::default(), hash_receiver, Duration::from_millis(1)); + + sleep(Duration::from_millis(3)); + drop(hash_sender); + let pohs: Vec<_> = poh_receiver.iter().map(|x| x).collect(); + assert!(pohs.len() > 1); + + assert!(verify(Hash::default(), &pohs)); + } +}