Merge pull request #71 from garious/rework-recorder

Replicate the ledger
This commit is contained in:
Greg Fitzgerald 2018-03-21 17:23:55 -06:00 committed by GitHub
commit fde320e2f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 101 additions and 72 deletions

View File

@ -44,7 +44,6 @@ pub struct Accountant {
pub historian: Historian, pub historian: Historian,
pub balances: HashMap<PublicKey, i64>, pub balances: HashMap<PublicKey, i64>,
pub first_id: Hash, pub first_id: Hash,
pub last_id: Hash,
pending: HashMap<Signature, Plan>, pending: HashMap<Signature, Plan>,
time_sources: HashSet<PublicKey>, time_sources: HashSet<PublicKey>,
last_time: DateTime<Utc>, last_time: DateTime<Utc>,
@ -67,7 +66,6 @@ impl Accountant {
historian: hist, historian: hist,
balances: HashMap::new(), balances: HashMap::new(),
first_id: start_hash, first_id: start_hash,
last_id: start_hash,
pending: HashMap::new(), pending: HashMap::new(),
time_sources: HashSet::new(), time_sources: HashSet::new(),
last_time: Utc.timestamp(0, 0), last_time: Utc.timestamp(0, 0),
@ -91,13 +89,6 @@ impl Accountant {
Self::new_from_entries(mint.create_entries(), ms_per_tick) Self::new_from_entries(mint.create_entries(), ms_per_tick)
} }
pub fn sync(self: &mut Self) -> Hash {
while let Ok(entry) = self.historian.receiver.try_recv() {
self.last_id = entry.id;
}
self.last_id
}
fn is_deposit(allow_deposits: bool, from: &PublicKey, plan: &Plan) -> bool { fn is_deposit(allow_deposits: bool, from: &PublicKey, plan: &Plan) -> bool {
if let Plan::Pay(ref payment) = *plan { if let Plan::Pay(ref payment) = *plan {
allow_deposits && *from == payment.to allow_deposits && *from == payment.to
@ -210,8 +201,9 @@ impl Accountant {
n: i64, n: i64,
keypair: &KeyPair, keypair: &KeyPair,
to: PublicKey, to: PublicKey,
last_id: Hash,
) -> Result<Signature> { ) -> Result<Signature> {
let tr = Transaction::new(keypair, to, n, self.last_id); let tr = Transaction::new(keypair, to, n, last_id);
let sig = tr.sig; let sig = tr.sig;
self.process_transaction(tr).map(|_| sig) self.process_transaction(tr).map(|_| sig)
} }
@ -222,8 +214,9 @@ impl Accountant {
keypair: &KeyPair, keypair: &KeyPair,
to: PublicKey, to: PublicKey,
dt: DateTime<Utc>, dt: DateTime<Utc>,
last_id: Hash,
) -> Result<Signature> { ) -> Result<Signature> {
let tr = Transaction::new_on_date(keypair, to, dt, n, self.last_id); let tr = Transaction::new_on_date(keypair, to, dt, n, last_id);
let sig = tr.sig; let sig = tr.sig;
self.process_transaction(tr).map(|_| sig) self.process_transaction(tr).map(|_| sig)
} }
@ -244,10 +237,12 @@ mod tests {
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let mut acc = Accountant::new(&alice, Some(2)); let mut acc = Accountant::new(&alice, Some(2));
acc.transfer(1_000, &alice.keypair(), bob_pubkey).unwrap(); acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed())
.unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
acc.transfer(500, &alice.keypair(), bob_pubkey).unwrap(); acc.transfer(500, &alice.keypair(), bob_pubkey, alice.seed())
.unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500);
drop(acc.historian.sender); drop(acc.historian.sender);
@ -262,9 +257,10 @@ mod tests {
let alice = Mint::new(11_000); let alice = Mint::new(11_000);
let mut acc = Accountant::new(&alice, Some(2)); let mut acc = Accountant::new(&alice, Some(2));
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
acc.transfer(1_000, &alice.keypair(), bob_pubkey).unwrap(); acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed())
.unwrap();
assert_eq!( assert_eq!(
acc.transfer(10_001, &alice.keypair(), bob_pubkey), acc.transfer(10_001, &alice.keypair(), bob_pubkey, alice.seed()),
Err(AccountingError::InsufficientFunds) Err(AccountingError::InsufficientFunds)
); );
@ -309,7 +305,8 @@ mod tests {
let mut acc = Accountant::new(&alice, Some(2)); let mut acc = Accountant::new(&alice, Some(2));
let alice_keypair = alice.keypair(); let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); acc.transfer(500, &alice_keypair, bob_pubkey, alice.seed())
.unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500);
drop(acc.historian.sender); drop(acc.historian.sender);
@ -326,7 +323,7 @@ mod tests {
let alice_keypair = alice.keypair(); let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now(); let dt = Utc::now();
acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt) acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.seed())
.unwrap(); .unwrap();
// Alice's balance will be zero because all funds are locked up. // Alice's balance will be zero because all funds are locked up.
@ -355,7 +352,7 @@ mod tests {
acc.process_verified_timestamp(alice.pubkey(), dt).unwrap(); acc.process_verified_timestamp(alice.pubkey(), dt).unwrap();
// It's now past now, so this transfer should be processed immediately. // It's now past now, so this transfer should be processed immediately.
acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt) acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.seed())
.unwrap(); .unwrap();
assert_eq!(acc.get_balance(&alice.pubkey()), Some(0)); assert_eq!(acc.get_balance(&alice.pubkey()), Some(0));
@ -369,7 +366,7 @@ mod tests {
let alice_keypair = alice.keypair(); let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now(); let dt = Utc::now();
let sig = acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt) let sig = acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.seed())
.unwrap(); .unwrap();
// Alice's balance will be zero because all funds are locked up. // Alice's balance will be zero because all funds are locked up.

View File

@ -12,9 +12,12 @@ use std::time::Duration;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::default::Default; use std::default::Default;
use serde_json;
pub struct AccountantSkel { pub struct AccountantSkel {
pub acc: Accountant, pub acc: Accountant,
pub last_id: Hash,
pub ledger: Vec<Entry>,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -34,7 +37,21 @@ pub enum Response {
impl AccountantSkel { impl AccountantSkel {
pub fn new(acc: Accountant) -> Self { pub fn new(acc: Accountant) -> Self {
AccountantSkel { acc } let last_id = acc.first_id;
AccountantSkel {
acc,
last_id,
ledger: vec![],
}
}
pub fn sync(self: &mut Self) -> Hash {
while let Ok(entry) = self.acc.historian.receiver.try_recv() {
self.last_id = entry.id;
println!("{}", serde_json::to_string(&entry).unwrap());
self.ledger.push(entry);
}
self.last_id
} }
pub fn process_request(self: &mut Self, msg: Request) -> Option<Response> { pub fn process_request(self: &mut Self, msg: Request) -> Option<Response> {
@ -49,11 +66,20 @@ impl AccountantSkel {
let val = self.acc.get_balance(&key); let val = self.acc.get_balance(&key);
Some(Response::Balance { key, val }) Some(Response::Balance { key, val })
} }
Request::GetEntries { .. } => Some(Response::Entries { entries: vec![] }), Request::GetEntries { last_id } => {
self.sync();
let entries = self.ledger
.iter()
.skip_while(|x| x.id != last_id) // log(n) way to find Entry with id == last_id.
.skip(1) // Skip the entry with last_id.
.take(256) // TODO: Take while the serialized entries fit into a 64k UDP packet.
.cloned()
.collect();
Some(Response::Entries { entries })
}
Request::GetId { is_last } => Some(Response::Id { Request::GetId { is_last } => Some(Response::Id {
id: if is_last { id: if is_last {
self.acc.sync(); self.sync()
self.acc.last_id
} else { } else {
self.acc.first_id self.acc.first_id
}, },

View File

@ -14,7 +14,6 @@ use accountant_skel::{Request, Response};
pub struct AccountantStub { pub struct AccountantStub {
pub addr: String, pub addr: String,
pub socket: UdpSocket, pub socket: UdpSocket,
pub last_id: Option<Hash>,
} }
impl AccountantStub { impl AccountantStub {
@ -22,7 +21,6 @@ impl AccountantStub {
AccountantStub { AccountantStub {
addr: addr.to_string(), addr: addr.to_string(),
socket, socket,
last_id: None,
} }
} }
@ -75,38 +73,47 @@ impl AccountantStub {
self.get_id(true) self.get_id(true)
} }
pub fn wait_on_signature(&mut self, wait_sig: &Signature) -> io::Result<()> { pub fn check_on_signature(
let last_id = match self.last_id { &mut self,
None => { wait_sig: &Signature,
let first_id = self.get_id(false)?; last_id: &Hash,
self.last_id = Some(first_id); ) -> io::Result<(bool, Hash)> {
first_id let mut last_id = *last_id;
}
Some(last_id) => last_id,
};
let req = Request::GetEntries { last_id }; let req = Request::GetEntries { last_id };
let data = serialize(&req).unwrap(); let data = serialize(&req).unwrap();
self.socket.send_to(&data, &self.addr).map(|_| ())?; self.socket.send_to(&data, &self.addr).map(|_| ())?;
let mut buf = vec![0u8; 1024]; let mut buf = vec![0u8; 65_535];
self.socket.recv_from(&mut buf)?; self.socket.recv_from(&mut buf)?;
let resp = deserialize(&buf).expect("deserialize signature"); let resp = deserialize(&buf).expect("deserialize signature");
let mut found = false;
if let Response::Entries { entries } = resp { if let Response::Entries { entries } = resp {
for Entry { id, events, .. } in entries { for Entry { id, events, .. } in entries {
self.last_id = Some(id); last_id = id;
for event in events { if !found {
if let Some(sig) = event.get_signature() { for event in events {
if sig == *wait_sig { if let Some(sig) = event.get_signature() {
return Ok(()); if sig == *wait_sig {
found = true;
}
} }
} }
} }
} }
} }
// TODO: Loop until we found it. Ok((found, last_id))
Ok(()) }
pub fn wait_on_signature(&mut self, wait_sig: &Signature, last_id: &Hash) -> io::Result<Hash> {
let mut found = false;
let mut last_id = *last_id;
while !found {
let ret = self.check_on_signature(wait_sig, &last_id)?;
found = ret.0;
last_id = ret.1;
}
Ok(last_id)
} }
} }
@ -126,7 +133,7 @@ mod tests {
let addr = "127.0.0.1:9000"; let addr = "127.0.0.1:9000";
let send_addr = "127.0.0.1:9001"; let send_addr = "127.0.0.1:9001";
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let acc = Accountant::new(&alice, None); let acc = Accountant::new(&alice, Some(30));
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(Mutex::new(false)); let exit = Arc::new(Mutex::new(false));
let acc = Arc::new(Mutex::new(AccountantSkel::new(acc))); let acc = Arc::new(Mutex::new(AccountantSkel::new(acc)));
@ -138,7 +145,7 @@ mod tests {
let last_id = acc.get_last_id().unwrap(); let last_id = acc.get_last_id().unwrap();
let sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id) let sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
.unwrap(); .unwrap();
acc.wait_on_signature(&sig).unwrap(); acc.wait_on_signature(&sig, &last_id).unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500); assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500);
*exit.lock().unwrap() = true; *exit.lock().unwrap() = true;
for t in threads.iter() { for t in threads.iter() {

View File

@ -65,7 +65,7 @@ fn main() {
acc.transfer_signed(tr).unwrap(); acc.transfer_signed(tr).unwrap();
} }
println!("Waiting for last transaction to be confirmed...",); println!("Waiting for last transaction to be confirmed...",);
acc.wait_on_signature(&sig).unwrap(); acc.wait_on_signature(&sig, &last_id).unwrap();
let duration = now.elapsed(); let duration = now.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64; let ns = duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64;

View File

@ -5,7 +5,7 @@ use std::thread::{spawn, JoinHandle};
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::time::Instant; use std::time::Instant;
use hash::{hash, Hash}; use hash::Hash;
use entry::Entry; use entry::Entry;
use recorder::{ExitReason, Recorder, Signal}; use recorder::{ExitReason, Recorder, Signal};
use signature::Signature; use signature::Signature;
@ -48,8 +48,7 @@ impl Historian {
return err; return err;
} }
if ms_per_tick.is_some() { if ms_per_tick.is_some() {
recorder.last_id = hash(&recorder.last_id); recorder.hash();
recorder.num_hashes += 1;
} }
} }
}) })
@ -127,12 +126,9 @@ mod tests {
hist.sender.send(Signal::Tick).unwrap(); hist.sender.send(Signal::Tick).unwrap();
drop(hist.sender); drop(hist.sender);
let entries: Vec<Entry> = hist.receiver.iter().collect(); let entries: Vec<Entry> = hist.receiver.iter().collect();
assert!(entries.len() > 1);
// Ensure one entry is sent back for each tick sent in. // Ensure the ID is not the seed.
assert_eq!(entries.len(), 1);
// Ensure the ID is not the seed, which indicates another Tick
// was recorded before the one we sent.
assert_ne!(entries[0].id, zero); assert_ne!(entries[0].id, zero);
} }
} }

View File

@ -8,10 +8,9 @@
use std::sync::mpsc::{Receiver, SyncSender, TryRecvError}; use std::sync::mpsc::{Receiver, SyncSender, TryRecvError};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::mem; use std::mem;
use hash::Hash; use hash::{hash, Hash};
use entry::{create_entry_mut, Entry}; use entry::{create_entry_mut, Entry};
use event::Event; use event::Event;
use serde_json;
pub enum Signal { pub enum Signal {
Tick, Tick,
@ -25,12 +24,12 @@ pub enum ExitReason {
} }
pub struct Recorder { pub struct Recorder {
pub sender: SyncSender<Entry>, sender: SyncSender<Entry>,
pub receiver: Receiver<Signal>, receiver: Receiver<Signal>,
pub last_id: Hash, last_hash: Hash,
pub events: Vec<Event>, events: Vec<Event>,
pub num_hashes: u64, num_hashes: u64,
pub num_ticks: u64, num_ticks: u64,
} }
impl Recorder { impl Recorder {
@ -38,18 +37,25 @@ impl Recorder {
Recorder { Recorder {
receiver, receiver,
sender, sender,
last_id: start_hash, last_hash: start_hash,
events: vec![], events: vec![],
num_hashes: 0, num_hashes: 0,
num_ticks: 0, num_ticks: 0,
} }
} }
pub fn record_entry(&mut self) -> Result<Entry, ExitReason> { pub fn hash(&mut self) {
self.last_hash = hash(&self.last_hash);
self.num_hashes += 1;
}
pub fn record_entry(&mut self) -> Result<(), ExitReason> {
let events = mem::replace(&mut self.events, vec![]); let events = mem::replace(&mut self.events, vec![]);
let entry = create_entry_mut(&mut self.last_id, &mut self.num_hashes, events); let entry = create_entry_mut(&mut self.last_hash, &mut self.num_hashes, events);
println!("{}", serde_json::to_string(&entry).unwrap()); self.sender
Ok(entry) .send(entry)
.or(Err(ExitReason::SendDisconnected))?;
Ok(())
} }
pub fn process_events( pub fn process_events(
@ -68,10 +74,7 @@ impl Recorder {
match self.receiver.try_recv() { match self.receiver.try_recv() {
Ok(signal) => match signal { Ok(signal) => match signal {
Signal::Tick => { Signal::Tick => {
let entry = self.record_entry()?; self.record_entry()?;
self.sender
.send(entry)
.or(Err(ExitReason::SendDisconnected))?;
} }
Signal::Event(event) => { Signal::Event(event) => {
self.events.push(event); self.events.push(event);

View File

@ -7,7 +7,7 @@ use std::thread::{spawn, JoinHandle};
use result::{Error, Result}; use result::{Error, Result};
const BLOCK_SIZE: usize = 1024 * 8; const BLOCK_SIZE: usize = 1024 * 8;
pub const PACKET_SIZE: usize = 256; pub const PACKET_SIZE: usize = 1024;
#[derive(Clone)] #[derive(Clone)]
pub struct Packet { pub struct Packet {