Add historian

A microservice that continuously generates hashes, only stopping to
tag messages with the latest hash.

Fixes #8
This commit is contained in:
Greg Fitzgerald 2018-02-17 20:51:41 -07:00
parent 3550f703c3
commit 831e2cbdc9
3 changed files with 128 additions and 0 deletions

View File

@ -12,6 +12,7 @@
/// Though processing power varies across nodes, the network gives priority to the
/// fastest processor. Duration should therefore be estimated by assuming that the hash
/// was generated by the fastest processor at the time the entry was logged.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Event {
pub num_hashes: u64,
pub end_hash: u64,
@ -23,6 +24,7 @@ pub struct Event {
/// be generated in 'num_hashes' hashes and verified in 'num_hashes' hashes. By logging
/// a hash alongside the tick, each tick and be verified in parallel using the 'end_hash'
/// of the preceding tick to seed its hashing.
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum EventData {
Tick,
UserDataKey(u64),

125
src/historian.rs Normal file
View File

@ -0,0 +1,125 @@
//! The `historian` crate provides a microservice for generating a Proof-of-History.
//! It logs EventData items on behalf of its users. It continuously generates
//! new hashes, only stopping to check if it has been sent an EventData item. It
//! tags each EventData with an Event and sends it back. The Event includes the
//! EventData, the latest hash, and the number of hashes since the last event.
//! The resulting Event stream represents ordered events in time.
use std::thread::JoinHandle;
use std::sync::mpsc::{Receiver, Sender};
use event::{Event, EventData};
pub struct Historian {
pub sender: Sender<EventData>,
pub receiver: Receiver<Event>,
pub thread_hdl: JoinHandle<(Event, EventThreadExitReason)>,
}
pub enum EventThreadExitReason {
RecvDisconnected,
SendDisconnected,
}
fn drain_queue(
receiver: &Receiver<EventData>,
sender: &Sender<Event>,
num_hashes: u64,
end_hash: u64,
) -> Result<u64, (Event, EventThreadExitReason)> {
use std::sync::mpsc::TryRecvError;
let mut num_hashes = num_hashes;
loop {
match receiver.try_recv() {
Ok(data) => {
let e = Event {
end_hash,
num_hashes,
data: data.clone(),
};
if let Err(_) = sender.send(e) {
let e = Event {
end_hash,
num_hashes,
data,
};
return Err((e, EventThreadExitReason::SendDisconnected));
}
num_hashes = 0;
}
Err(TryRecvError::Empty) => {
return Ok(num_hashes);
}
Err(TryRecvError::Disconnected) => {
let e = Event {
end_hash,
num_hashes,
data: EventData::Tick,
};
return Err((e, EventThreadExitReason::RecvDisconnected));
}
}
}
}
/// A background thread that will continue tagging received EventData messages and
/// sending back Event messages until either the receiver or sender channel is closed.
pub fn event_stream(
start_hash: u64,
receiver: Receiver<EventData>,
sender: Sender<Event>,
) -> JoinHandle<(Event, EventThreadExitReason)> {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::thread;
thread::spawn(move || {
let mut end_hash = start_hash;
let mut hasher = DefaultHasher::new();
let mut num_hashes = 0;
loop {
match drain_queue(&receiver, &sender, num_hashes, end_hash) {
Ok(n) => num_hashes = n,
Err(e) => return e,
}
end_hash.hash(&mut hasher);
end_hash = hasher.finish();
num_hashes += 1;
}
})
}
impl Historian {
pub fn new(start_hash: u64) -> Self {
use std::sync::mpsc::channel;
let (sender, event_data_receiver) = channel();
let (event_sender, receiver) = channel();
let thread_hdl = event_stream(start_hash, event_data_receiver, event_sender);
Historian {
sender,
receiver,
thread_hdl,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use event::*;
#[test]
fn test_historian() {
let hist = Historian::new(0);
let data = EventData::Tick;
hist.sender.send(data.clone()).unwrap();
let e0 = hist.receiver.recv().unwrap();
assert_eq!(e0.data, data);
let data = EventData::UserDataKey(0xdeadbeef);
hist.sender.send(data.clone()).unwrap();
let e1 = hist.receiver.recv().unwrap();
assert_eq!(e1.data, data);
verify_slice(&[e0, e1], 0);
}
}

View File

@ -1,4 +1,5 @@
#![cfg_attr(feature = "unstable", feature(test))]
pub mod event;
pub mod historian;
extern crate itertools;
extern crate rayon;