step one of lastidnotfound: record_stage->record_service, trim recorder to hashes (#1281)

step one of lastidnotfound

* record_stage->record_service, trim recorder to hashes
* doc updates, hash multiple without alloc()

cc #1171
This commit is contained in:
Rob Walker 2018-09-20 15:02:24 -07:00 committed by GitHub
parent a6c15684c9
commit 62a18d4c02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 278 additions and 4 deletions

View File

@ -26,16 +26,23 @@ impl fmt::Display for Hash {
write!(f, "{}", bs58::encode(self.0).into_string())
}
}
/// Return a Sha256 hash for the given data.
pub fn hash(val: &[u8]) -> Hash {
let mut hasher = Sha256::default();
hasher.input(val);
/// Return a Sha256 hash for the given data.
pub fn hashv(vals: &[&[u8]]) -> Hash {
let mut hasher = Sha256::default();
for val in vals {
hasher.input(val);
}
// At the time of this writing, the sha2 library is stuck on an old version
// of generic_array (0.9.0). Decouple ourselves with a clone to our version.
Hash(GenericArray::clone_from_slice(hasher.result().as_slice()))
}
/// Return a Sha256 hash for the given data.
pub fn hash(val: &[u8]) -> Hash {
hashv(&[val])
}
/// Return the hash of the given hash extended with the given value.
pub fn extend_and_hash(id: &Hash, val: &[u8]) -> Hash {
let mut hash_data = id.as_ref().to_vec();

View File

@ -36,6 +36,8 @@ pub mod ncp;
pub mod netutil;
pub mod packet;
pub mod payment_plan;
pub mod poh;
pub mod poh_service;
pub mod record_stage;
pub mod recorder;
pub mod recvmmsg;

107
src/poh.rs Normal file
View File

@ -0,0 +1,107 @@
//! The `Poh` module provides an object for generating a Proof of History.
//! It records Hashes items on behalf of its users.
use hash::{hash, hashv, Hash};
use std::time::{Duration, Instant};
pub struct Poh {
last_hash: Hash,
num_hashes: u64,
last_tick: Instant,
tick_duration: Option<Duration>,
}
#[derive(Debug)]
pub struct PohEntry {
pub num_hashes: u64,
pub id: Hash,
pub mixin: Option<Hash>,
}
impl Poh {
pub fn new(last_hash: Hash, tick_duration: Option<Duration>) -> Self {
let last_tick = Instant::now();
Poh {
last_hash,
num_hashes: 0,
last_tick,
tick_duration,
}
}
pub fn hash(&mut self) {
self.last_hash = hash(&self.last_hash.as_ref());
self.num_hashes += 1;
}
pub fn record(&mut self, mixin: Hash) -> PohEntry {
let num_hashes = self.num_hashes + 1;
self.num_hashes = 0;
self.last_hash = hashv(&[&self.last_hash.as_ref(), &mixin.as_ref()]);
PohEntry {
num_hashes,
id: self.last_hash,
mixin: Some(mixin),
}
}
// emissions of Ticks (i.e. PohEntries without a mixin) allows
// validators to parallelize the work of catching up
pub fn tick(&mut self) -> Option<PohEntry> {
if let Some(tick_duration) = self.tick_duration {
if self.last_tick.elapsed() >= tick_duration {
self.last_tick = Instant::now();
let entry = PohEntry {
num_hashes: self.num_hashes,
id: self.last_hash,
mixin: None,
};
self.num_hashes = 0;
return Some(entry);
}
}
None
}
}
pub fn verify(initial: Hash, entries: &[PohEntry]) -> bool {
let mut last_hash = initial;
for entry in entries {
assert!(entry.num_hashes != 0);
for _ in 1..entry.num_hashes {
last_hash = hash(&last_hash.as_ref());
}
let id = match entry.mixin {
Some(mixin) => hashv(&[&last_hash.as_ref(), &mixin.as_ref()]),
None => hash(&last_hash.as_ref()),
};
if id != entry.id {
return false;
}
last_hash = id;
}
return true;
}
#[cfg(test)]
mod tests {
use hash::Hash;
use poh::{self, PohEntry};
#[test]
#[should_panic]
fn test_poh_verify_assert() {
poh::verify(
Hash::default(),
&[PohEntry {
num_hashes: 0,
id: Hash::default(),
mixin: None,
}],
);
}
}

158
src/poh_service.rs Normal file
View File

@ -0,0 +1,158 @@
//! The `poh_service` module provides an object for generating a Proof of History.
//! It records Hashes items on behalf of its users. It continuously generates
//! new Hashes, only stopping to check if it has been sent a Hash to mix in
//! to the Poh.
//!
//! The returned Entry includes the mix-in request, the latest Poh Hash, and the
//! number of Hashes generated in the service since the last mix-in request.
//!
//! The resulting stream of Hashes represents ordered events in time.
//!
use hash::Hash;
use poh::{Poh, PohEntry};
use service::Service;
use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
pub struct PohService {
thread_hdl: JoinHandle<()>,
}
impl PohService {
/// A background thread that will continue tagging received Transaction messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
pub fn new(start_hash: Hash, hash_receiver: Receiver<Hash>) -> (Self, Receiver<PohEntry>) {
let (poh_sender, poh_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-record-service".to_string())
.spawn(move || {
let mut poh = Poh::new(start_hash, None);
let _ = Self::process_hashes(&mut poh, &hash_receiver, &poh_sender);
}).unwrap();
(PohService { thread_hdl }, poh_receiver)
}
/// Same as `PohService::new`, but will automatically produce entries every `tick_duration`.
pub fn new_with_clock(
start_hash: Hash,
hash_receiver: Receiver<Hash>,
tick_duration: Duration,
) -> (Self, Receiver<PohEntry>) {
let (poh_sender, poh_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-record-service".to_string())
.spawn(move || {
let mut poh = Poh::new(start_hash, Some(tick_duration));
loop {
if Self::try_process_hashes(&mut poh, &hash_receiver, &poh_sender).is_err() {
return;
}
poh.hash();
}
}).unwrap();
(PohService { thread_hdl }, poh_receiver)
}
fn process_hash(hash: Hash, poh: &mut Poh, sender: &Sender<PohEntry>) -> Result<(), ()> {
let resp = poh.record(hash);
sender.send(resp).or(Err(()))?;
Ok(())
}
fn process_hashes(
poh: &mut Poh,
receiver: &Receiver<Hash>,
sender: &Sender<PohEntry>,
) -> Result<(), ()> {
loop {
match receiver.recv() {
Ok(hash) => Self::process_hash(hash, poh, sender)?,
Err(RecvError) => return Err(()),
}
}
}
fn try_process_hashes(
poh: &mut Poh,
receiver: &Receiver<Hash>,
sender: &Sender<PohEntry>,
) -> Result<(), ()> {
loop {
if let Some(resp) = poh.tick() {
sender.send(resp).or(Err(()))?;
}
match receiver.try_recv() {
Ok(hash) => Self::process_hash(hash, poh, sender)?,
Err(TryRecvError::Empty) => return Ok(()),
Err(TryRecvError::Disconnected) => return Err(()),
};
}
}
}
impl Service for PohService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
#[cfg(test)]
mod tests {
use super::*;
use poh::verify;
use std::sync::mpsc::channel;
use std::thread::sleep;
#[test]
fn test_poh() {
let (hash_sender, hash_receiver) = channel();
let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver);
hash_sender.send(Hash::default()).unwrap();
sleep(Duration::from_millis(1));
hash_sender.send(Hash::default()).unwrap();
sleep(Duration::from_millis(1));
hash_sender.send(Hash::default()).unwrap();
let entry0 = poh_receiver.recv().unwrap();
let entry1 = poh_receiver.recv().unwrap();
let entry2 = poh_receiver.recv().unwrap();
assert_eq!(entry0.num_hashes, 1);
assert_eq!(entry0.num_hashes, 1);
assert_eq!(entry0.num_hashes, 1);
drop(hash_sender);
assert_eq!(poh_service.thread_hdl.join().unwrap(), ());
assert!(verify(Hash::default(), &[entry0, entry1, entry2]));
}
#[test]
fn test_poh_closed_sender() {
let (hash_sender, hash_receiver) = channel();
let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver);
drop(poh_receiver);
hash_sender.send(Hash::default()).unwrap();
assert_eq!(poh_service.thread_hdl.join().unwrap(), ());
}
#[test]
fn test_poh_clock() {
let (hash_sender, hash_receiver) = channel();
let (_poh_service, poh_receiver) =
PohService::new_with_clock(Hash::default(), hash_receiver, Duration::from_millis(1));
sleep(Duration::from_millis(3));
drop(hash_sender);
let pohs: Vec<_> = poh_receiver.iter().map(|x| x).collect();
assert!(pohs.len() > 1);
assert!(verify(Hash::default(), &pohs));
}
}