Merge branch 'master' into 156__remove_user_keys_in_mintdemo
This commit is contained in:
commit
05a5e551d6
|
@ -12,10 +12,6 @@ authors = [
|
|||
]
|
||||
license = "Apache-2.0"
|
||||
|
||||
[[bin]]
|
||||
name = "solana-historian-demo"
|
||||
path = "src/bin/historian-demo.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "solana-client-demo"
|
||||
path = "src/bin/client-demo.rs"
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
theme: jekyll-theme-slate
|
|
@ -6,6 +6,7 @@
|
|||
extern crate libc;
|
||||
|
||||
use chrono::prelude::*;
|
||||
use entry::Entry;
|
||||
use event::Event;
|
||||
use hash::Hash;
|
||||
use mint::Mint;
|
||||
|
@ -15,8 +16,8 @@ use signature::{KeyPair, PublicKey, Signature};
|
|||
use std::collections::hash_map::Entry::Occupied;
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::result;
|
||||
use std::sync::atomic::{AtomicIsize, Ordering};
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::{AtomicIsize, Ordering};
|
||||
use transaction::Transaction;
|
||||
|
||||
pub const MAX_ENTRY_IDS: usize = 1024 * 4;
|
||||
|
@ -32,12 +33,19 @@ pub type Result<T> = result::Result<T, AccountingError>;
|
|||
|
||||
/// Commit funds to the 'to' party.
|
||||
fn apply_payment(balances: &RwLock<HashMap<PublicKey, AtomicIsize>>, payment: &Payment) {
|
||||
// First we check balances with a read lock to maximize potential parallelization.
|
||||
if balances.read().unwrap().contains_key(&payment.to) {
|
||||
let bals = balances.read().unwrap();
|
||||
bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed);
|
||||
} else {
|
||||
// Now we know the key wasn't present a nanosecond ago, but it might be there
|
||||
// by the time we aquire a write lock, so we'll have to check again.
|
||||
let mut bals = balances.write().unwrap();
|
||||
bals.insert(payment.to, AtomicIsize::new(payment.tokens as isize));
|
||||
if bals.contains_key(&payment.to) {
|
||||
bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed);
|
||||
} else {
|
||||
bals.insert(payment.to, AtomicIsize::new(payment.tokens as isize));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,9 +77,9 @@ impl Accountant {
|
|||
to: mint.pubkey(),
|
||||
tokens: mint.tokens,
|
||||
};
|
||||
let acc = Self::new_from_deposit(&deposit);
|
||||
acc.register_entry_id(&mint.last_id());
|
||||
acc
|
||||
let accountant = Self::new_from_deposit(&deposit);
|
||||
accountant.register_entry_id(&mint.last_id());
|
||||
accountant
|
||||
}
|
||||
|
||||
/// Return the last entry ID registered
|
||||
|
@ -232,6 +240,16 @@ impl Accountant {
|
|||
results
|
||||
}
|
||||
|
||||
pub fn process_verified_entries(&self, entries: Vec<Entry>) -> Result<()> {
|
||||
for entry in entries {
|
||||
self.register_entry_id(&entry.id);
|
||||
for result in self.process_verified_events(entry.events) {
|
||||
result?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Process a Witness Signature that has already been verified.
|
||||
fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
|
||||
if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) {
|
||||
|
@ -339,24 +357,26 @@ mod tests {
|
|||
fn test_accountant() {
|
||||
let alice = Mint::new(10_000);
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let acc = Accountant::new(&alice);
|
||||
assert_eq!(acc.last_id(), alice.last_id());
|
||||
let accountant = Accountant::new(&alice);
|
||||
assert_eq!(accountant.last_id(), alice.last_id());
|
||||
|
||||
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
|
||||
accountant
|
||||
.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
|
||||
.unwrap();
|
||||
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
|
||||
assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_000);
|
||||
|
||||
acc.transfer(500, &alice.keypair(), bob_pubkey, alice.last_id())
|
||||
accountant
|
||||
.transfer(500, &alice.keypair(), bob_pubkey, alice.last_id())
|
||||
.unwrap();
|
||||
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500);
|
||||
assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_500);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_account_not_found() {
|
||||
let mint = Mint::new(1);
|
||||
let acc = Accountant::new(&mint);
|
||||
let accountant = Accountant::new(&mint);
|
||||
assert_eq!(
|
||||
acc.transfer(1, &KeyPair::new(), mint.pubkey(), mint.last_id()),
|
||||
accountant.transfer(1, &KeyPair::new(), mint.pubkey(), mint.last_id()),
|
||||
Err(AccountingError::AccountNotFound)
|
||||
);
|
||||
}
|
||||
|
@ -364,141 +384,156 @@ mod tests {
|
|||
#[test]
|
||||
fn test_invalid_transfer() {
|
||||
let alice = Mint::new(11_000);
|
||||
let acc = Accountant::new(&alice);
|
||||
let accountant = Accountant::new(&alice);
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
|
||||
accountant
|
||||
.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
acc.transfer(10_001, &alice.keypair(), bob_pubkey, alice.last_id()),
|
||||
accountant.transfer(10_001, &alice.keypair(), bob_pubkey, alice.last_id()),
|
||||
Err(AccountingError::InsufficientFunds)
|
||||
);
|
||||
|
||||
let alice_pubkey = alice.keypair().pubkey();
|
||||
assert_eq!(acc.get_balance(&alice_pubkey).unwrap(), 10_000);
|
||||
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
|
||||
assert_eq!(accountant.get_balance(&alice_pubkey).unwrap(), 10_000);
|
||||
assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_transfer_to_newb() {
|
||||
let alice = Mint::new(10_000);
|
||||
let acc = Accountant::new(&alice);
|
||||
let accountant = Accountant::new(&alice);
|
||||
let alice_keypair = alice.keypair();
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
acc.transfer(500, &alice_keypair, bob_pubkey, alice.last_id())
|
||||
accountant
|
||||
.transfer(500, &alice_keypair, bob_pubkey, alice.last_id())
|
||||
.unwrap();
|
||||
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500);
|
||||
assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 500);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_transfer_on_date() {
|
||||
let alice = Mint::new(1);
|
||||
let acc = Accountant::new(&alice);
|
||||
let accountant = Accountant::new(&alice);
|
||||
let alice_keypair = alice.keypair();
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let dt = Utc::now();
|
||||
acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
|
||||
accountant
|
||||
.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
|
||||
.unwrap();
|
||||
|
||||
// Alice's balance will be zero because all funds are locked up.
|
||||
assert_eq!(acc.get_balance(&alice.pubkey()), Some(0));
|
||||
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0));
|
||||
|
||||
// Bob's balance will be None because the funds have not been
|
||||
// sent.
|
||||
assert_eq!(acc.get_balance(&bob_pubkey), None);
|
||||
assert_eq!(accountant.get_balance(&bob_pubkey), None);
|
||||
|
||||
// Now, acknowledge the time in the condition occurred and
|
||||
// that bob's funds are now available.
|
||||
acc.process_verified_timestamp(alice.pubkey(), dt).unwrap();
|
||||
assert_eq!(acc.get_balance(&bob_pubkey), Some(1));
|
||||
accountant
|
||||
.process_verified_timestamp(alice.pubkey(), dt)
|
||||
.unwrap();
|
||||
assert_eq!(accountant.get_balance(&bob_pubkey), Some(1));
|
||||
|
||||
acc.process_verified_timestamp(alice.pubkey(), dt).unwrap(); // <-- Attack! Attempt to process completed transaction.
|
||||
assert_ne!(acc.get_balance(&bob_pubkey), Some(2));
|
||||
accountant
|
||||
.process_verified_timestamp(alice.pubkey(), dt)
|
||||
.unwrap(); // <-- Attack! Attempt to process completed transaction.
|
||||
assert_ne!(accountant.get_balance(&bob_pubkey), Some(2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_transfer_after_date() {
|
||||
let alice = Mint::new(1);
|
||||
let acc = Accountant::new(&alice);
|
||||
let accountant = Accountant::new(&alice);
|
||||
let alice_keypair = alice.keypair();
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let dt = Utc::now();
|
||||
acc.process_verified_timestamp(alice.pubkey(), dt).unwrap();
|
||||
|
||||
// It's now past now, so this transfer should be processed immediately.
|
||||
acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
|
||||
accountant
|
||||
.process_verified_timestamp(alice.pubkey(), dt)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(acc.get_balance(&alice.pubkey()), Some(0));
|
||||
assert_eq!(acc.get_balance(&bob_pubkey), Some(1));
|
||||
// It's now past now, so this transfer should be processed immediately.
|
||||
accountant
|
||||
.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0));
|
||||
assert_eq!(accountant.get_balance(&bob_pubkey), Some(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cancel_transfer() {
|
||||
let alice = Mint::new(1);
|
||||
let acc = Accountant::new(&alice);
|
||||
let accountant = Accountant::new(&alice);
|
||||
let alice_keypair = alice.keypair();
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let dt = Utc::now();
|
||||
let sig = acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
|
||||
let sig = accountant
|
||||
.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
|
||||
.unwrap();
|
||||
|
||||
// Alice's balance will be zero because all funds are locked up.
|
||||
assert_eq!(acc.get_balance(&alice.pubkey()), Some(0));
|
||||
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0));
|
||||
|
||||
// Bob's balance will be None because the funds have not been
|
||||
// sent.
|
||||
assert_eq!(acc.get_balance(&bob_pubkey), None);
|
||||
assert_eq!(accountant.get_balance(&bob_pubkey), None);
|
||||
|
||||
// Now, cancel the trancaction. Alice gets her funds back, Bob never sees them.
|
||||
acc.process_verified_sig(alice.pubkey(), sig).unwrap();
|
||||
assert_eq!(acc.get_balance(&alice.pubkey()), Some(1));
|
||||
assert_eq!(acc.get_balance(&bob_pubkey), None);
|
||||
accountant
|
||||
.process_verified_sig(alice.pubkey(), sig)
|
||||
.unwrap();
|
||||
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1));
|
||||
assert_eq!(accountant.get_balance(&bob_pubkey), None);
|
||||
|
||||
acc.process_verified_sig(alice.pubkey(), sig).unwrap(); // <-- Attack! Attempt to cancel completed transaction.
|
||||
assert_ne!(acc.get_balance(&alice.pubkey()), Some(2));
|
||||
accountant
|
||||
.process_verified_sig(alice.pubkey(), sig)
|
||||
.unwrap(); // <-- Attack! Attempt to cancel completed transaction.
|
||||
assert_ne!(accountant.get_balance(&alice.pubkey()), Some(2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_duplicate_event_signature() {
|
||||
let alice = Mint::new(1);
|
||||
let acc = Accountant::new(&alice);
|
||||
let accountant = Accountant::new(&alice);
|
||||
let sig = Signature::default();
|
||||
assert!(acc.reserve_signature_with_last_id(&sig, &alice.last_id()));
|
||||
assert!(!acc.reserve_signature_with_last_id(&sig, &alice.last_id()));
|
||||
assert!(accountant.reserve_signature_with_last_id(&sig, &alice.last_id()));
|
||||
assert!(!accountant.reserve_signature_with_last_id(&sig, &alice.last_id()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_forget_signature() {
|
||||
let alice = Mint::new(1);
|
||||
let acc = Accountant::new(&alice);
|
||||
let accountant = Accountant::new(&alice);
|
||||
let sig = Signature::default();
|
||||
acc.reserve_signature_with_last_id(&sig, &alice.last_id());
|
||||
assert!(acc.forget_signature_with_last_id(&sig, &alice.last_id()));
|
||||
assert!(!acc.forget_signature_with_last_id(&sig, &alice.last_id()));
|
||||
accountant.reserve_signature_with_last_id(&sig, &alice.last_id());
|
||||
assert!(accountant.forget_signature_with_last_id(&sig, &alice.last_id()));
|
||||
assert!(!accountant.forget_signature_with_last_id(&sig, &alice.last_id()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_max_entry_ids() {
|
||||
let alice = Mint::new(1);
|
||||
let acc = Accountant::new(&alice);
|
||||
let accountant = Accountant::new(&alice);
|
||||
let sig = Signature::default();
|
||||
for i in 0..MAX_ENTRY_IDS {
|
||||
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
|
||||
acc.register_entry_id(&last_id);
|
||||
accountant.register_entry_id(&last_id);
|
||||
}
|
||||
// Assert we're no longer able to use the oldest entry ID.
|
||||
assert!(!acc.reserve_signature_with_last_id(&sig, &alice.last_id()));
|
||||
assert!(!accountant.reserve_signature_with_last_id(&sig, &alice.last_id()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_debits_before_credits() {
|
||||
let mint = Mint::new(2);
|
||||
let acc = Accountant::new(&mint);
|
||||
let accountant = Accountant::new(&mint);
|
||||
let alice = KeyPair::new();
|
||||
let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
||||
let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
|
||||
let trs = vec![tr0, tr1];
|
||||
assert!(acc.process_verified_transactions(trs)[1].is_err());
|
||||
assert!(accountant.process_verified_transactions(trs)[1].is_err());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -514,7 +549,7 @@ mod bench {
|
|||
#[bench]
|
||||
fn process_verified_event_bench(bencher: &mut Bencher) {
|
||||
let mint = Mint::new(100_000_000);
|
||||
let acc = Accountant::new(&mint);
|
||||
let accountant = Accountant::new(&mint);
|
||||
// Create transactions between unrelated parties.
|
||||
let transactions: Vec<_> = (0..4096)
|
||||
.into_par_iter()
|
||||
|
@ -522,15 +557,15 @@ mod bench {
|
|||
// Seed the 'from' account.
|
||||
let rando0 = KeyPair::new();
|
||||
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, mint.last_id());
|
||||
acc.process_verified_transaction(&tr).unwrap();
|
||||
accountant.process_verified_transaction(&tr).unwrap();
|
||||
|
||||
// Seed the 'to' account and a cell for its signature.
|
||||
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
|
||||
acc.register_entry_id(&last_id);
|
||||
accountant.register_entry_id(&last_id);
|
||||
|
||||
let rando1 = KeyPair::new();
|
||||
let tr = Transaction::new(&rando0, rando1.pubkey(), 1, last_id);
|
||||
acc.process_verified_transaction(&tr).unwrap();
|
||||
accountant.process_verified_transaction(&tr).unwrap();
|
||||
|
||||
// Finally, return a transaction that's unique
|
||||
Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
|
||||
|
@ -538,12 +573,13 @@ mod bench {
|
|||
.collect();
|
||||
bencher.iter(|| {
|
||||
// Since benchmarker runs this multiple times, we need to clear the signatures.
|
||||
for sigs in acc.last_ids.read().unwrap().iter() {
|
||||
for sigs in accountant.last_ids.read().unwrap().iter() {
|
||||
sigs.1.write().unwrap().clear();
|
||||
}
|
||||
|
||||
assert!(
|
||||
acc.process_verified_transactions(transactions.clone())
|
||||
accountant
|
||||
.process_verified_transactions(transactions.clone())
|
||||
.iter()
|
||||
.all(|x| x.is_ok())
|
||||
);
|
||||
|
|
|
@ -0,0 +1,173 @@
|
|||
//! The `accounting_stage` module implements the accounting stage of the TPU.
|
||||
|
||||
use accountant::Accountant;
|
||||
use entry::Entry;
|
||||
use event::Event;
|
||||
use hash::Hash;
|
||||
use historian::Historian;
|
||||
use recorder::Signal;
|
||||
use result::Result;
|
||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
pub struct AccountingStage {
|
||||
pub output: Mutex<Receiver<Entry>>,
|
||||
entry_sender: Mutex<Sender<Entry>>,
|
||||
pub accountant: Arc<Accountant>,
|
||||
historian_input: Mutex<Sender<Signal>>,
|
||||
historian: Mutex<Historian>,
|
||||
}
|
||||
|
||||
impl AccountingStage {
|
||||
/// Create a new Tpu that wraps the given Accountant.
|
||||
pub fn new(accountant: Accountant, start_hash: &Hash, ms_per_tick: Option<u64>) -> Self {
|
||||
let (historian_input, event_receiver) = channel();
|
||||
let historian = Historian::new(event_receiver, start_hash, ms_per_tick);
|
||||
let (entry_sender, output) = channel();
|
||||
AccountingStage {
|
||||
output: Mutex::new(output),
|
||||
entry_sender: Mutex::new(entry_sender),
|
||||
accountant: Arc::new(accountant),
|
||||
historian_input: Mutex::new(historian_input),
|
||||
historian: Mutex::new(historian),
|
||||
}
|
||||
}
|
||||
|
||||
/// Process the transactions in parallel and then log the successful ones.
|
||||
pub fn process_events(&self, events: Vec<Event>) -> Result<()> {
|
||||
let historian = self.historian.lock().unwrap();
|
||||
let results = self.accountant.process_verified_events(events);
|
||||
let events = results.into_iter().filter_map(|x| x.ok()).collect();
|
||||
let sender = self.historian_input.lock().unwrap();
|
||||
sender.send(Signal::Events(events))?;
|
||||
|
||||
// Wait for the historian to tag our Events with an ID and then register it.
|
||||
let entry = historian.output.lock().unwrap().recv()?;
|
||||
self.accountant.register_entry_id(&entry.id);
|
||||
self.entry_sender.lock().unwrap().send(entry)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use accountant::Accountant;
|
||||
use accounting_stage::AccountingStage;
|
||||
use entry::Entry;
|
||||
use event::Event;
|
||||
use mint::Mint;
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use transaction::Transaction;
|
||||
|
||||
#[test]
|
||||
fn test_accounting_sequential_consistency() {
|
||||
// In this attack we'll demonstrate that a verifier can interpret the ledger
|
||||
// differently if either the server doesn't signal the ledger to add an
|
||||
// Entry OR if the verifier tries to parallelize across multiple Entries.
|
||||
let mint = Mint::new(2);
|
||||
let accountant = Accountant::new(&mint);
|
||||
let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None);
|
||||
|
||||
// Process a batch that includes a transaction that receives two tokens.
|
||||
let alice = KeyPair::new();
|
||||
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
||||
let events = vec![Event::Transaction(tr)];
|
||||
assert!(accounting_stage.process_events(events).is_ok());
|
||||
|
||||
// Process a second batch that spends one of those tokens.
|
||||
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
|
||||
let events = vec![Event::Transaction(tr)];
|
||||
assert!(accounting_stage.process_events(events).is_ok());
|
||||
|
||||
// Collect the ledger and feed it to a new accountant.
|
||||
drop(accounting_stage.entry_sender);
|
||||
let entries: Vec<Entry> = accounting_stage.output.lock().unwrap().iter().collect();
|
||||
|
||||
// Assert the user holds one token, not two. If the server only output one
|
||||
// entry, then the second transaction will be rejected, because it drives
|
||||
// the account balance below zero before the credit is added.
|
||||
let accountant = Accountant::new(&mint);
|
||||
for entry in entries {
|
||||
assert!(
|
||||
accountant
|
||||
.process_verified_events(entry.events)
|
||||
.into_iter()
|
||||
.all(|x| x.is_ok())
|
||||
);
|
||||
}
|
||||
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1));
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "unstable", test))]
|
||||
mod bench {
|
||||
extern crate test;
|
||||
use self::test::Bencher;
|
||||
use accountant::{Accountant, MAX_ENTRY_IDS};
|
||||
use accounting_stage::*;
|
||||
use bincode::serialize;
|
||||
use hash::hash;
|
||||
use mint::Mint;
|
||||
use rayon::prelude::*;
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use std::collections::HashSet;
|
||||
use std::time::Instant;
|
||||
use transaction::Transaction;
|
||||
|
||||
#[bench]
|
||||
fn process_events_bench(_bencher: &mut Bencher) {
|
||||
let mint = Mint::new(100_000_000);
|
||||
let accountant = Accountant::new(&mint);
|
||||
// Create transactions between unrelated parties.
|
||||
let txs = 100_000;
|
||||
let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
|
||||
let transactions: Vec<_> = (0..txs)
|
||||
.into_par_iter()
|
||||
.map(|i| {
|
||||
// Seed the 'to' account and a cell for its signature.
|
||||
let dummy_id = i % (MAX_ENTRY_IDS as i32);
|
||||
let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
|
||||
{
|
||||
let mut last_ids = last_ids.lock().unwrap();
|
||||
if !last_ids.contains(&last_id) {
|
||||
last_ids.insert(last_id);
|
||||
accountant.register_entry_id(&last_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Seed the 'from' account.
|
||||
let rando0 = KeyPair::new();
|
||||
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
|
||||
accountant.process_verified_transaction(&tr).unwrap();
|
||||
|
||||
let rando1 = KeyPair::new();
|
||||
let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
|
||||
accountant.process_verified_transaction(&tr).unwrap();
|
||||
|
||||
// Finally, return a transaction that's unique
|
||||
Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let events: Vec<_> = transactions
|
||||
.into_iter()
|
||||
.map(|tr| Event::Transaction(tr))
|
||||
.collect();
|
||||
|
||||
let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None);
|
||||
|
||||
let now = Instant::now();
|
||||
assert!(accounting_stage.process_events(events).is_ok());
|
||||
let duration = now.elapsed();
|
||||
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
|
||||
let tps = txs as f64 / sec;
|
||||
|
||||
// Ensure that all transactions were successfully logged.
|
||||
drop(accounting_stage.historian_input);
|
||||
let entries: Vec<Entry> = accounting_stage.output.lock().unwrap().iter().collect();
|
||||
assert_eq!(entries.len(), 1);
|
||||
assert_eq!(entries[0].events.len(), txs as usize);
|
||||
|
||||
println!("{} tps", tps);
|
||||
}
|
||||
}
|
|
@ -87,10 +87,10 @@ fn main() {
|
|||
println!("Binding to {}", client_addr);
|
||||
let socket = UdpSocket::bind(&client_addr).unwrap();
|
||||
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
|
||||
let mut acc = ThinClient::new(addr.parse().unwrap(), socket);
|
||||
let mut accountant = ThinClient::new(addr.parse().unwrap(), socket);
|
||||
|
||||
println!("Get last ID...");
|
||||
let last_id = acc.get_last_id().wait().unwrap();
|
||||
let last_id = accountant.get_last_id().wait().unwrap();
|
||||
println!("Got last ID {:?}", last_id);
|
||||
|
||||
let rnd = GenKeys::new(demo.mint.keypair().public_key_bytes());
|
||||
|
@ -122,7 +122,7 @@ fn main() {
|
|||
nsps / 1_000_f64
|
||||
);
|
||||
|
||||
let initial_tx_count = acc.transaction_count();
|
||||
let initial_tx_count = accountant.transaction_count();
|
||||
println!("initial count {}", initial_tx_count);
|
||||
|
||||
println!("Transfering {} transactions in {} batches", txs, threads);
|
||||
|
@ -134,16 +134,16 @@ fn main() {
|
|||
let mut client_addr: SocketAddr = client_addr.parse().unwrap();
|
||||
client_addr.set_port(0);
|
||||
let socket = UdpSocket::bind(client_addr).unwrap();
|
||||
let acc = ThinClient::new(addr.parse().unwrap(), socket);
|
||||
let accountant = ThinClient::new(addr.parse().unwrap(), socket);
|
||||
for tr in trs {
|
||||
acc.transfer_signed(tr.clone()).unwrap();
|
||||
accountant.transfer_signed(tr.clone()).unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
println!("Waiting for transactions to complete...",);
|
||||
let mut tx_count;
|
||||
for _ in 0..10 {
|
||||
tx_count = acc.transaction_count();
|
||||
tx_count = accountant.transaction_count();
|
||||
duration = now.elapsed();
|
||||
let txs = tx_count - initial_tx_count;
|
||||
println!("Transactions processed {}", txs);
|
||||
|
|
|
@ -8,7 +8,7 @@ extern crate untrusted;
|
|||
use isatty::stdin_isatty;
|
||||
use rayon::prelude::*;
|
||||
use solana::accountant::MAX_ENTRY_IDS;
|
||||
use solana::entry::{create_entry, next_tick};
|
||||
use solana::entry::{create_entry, next_entry};
|
||||
use solana::event::Event;
|
||||
use solana::mint::MintDemo;
|
||||
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
|
||||
|
@ -67,7 +67,7 @@ fn main() {
|
|||
// Offer client lots of entry IDs to use for each transaction's last_id.
|
||||
let mut last_id = last_id;
|
||||
for _ in 0..MAX_ENTRY_IDS {
|
||||
let entry = next_tick(&last_id, 1);
|
||||
let entry = next_entry(&last_id, 1, vec![]);
|
||||
last_id = entry.id;
|
||||
let serialized = serde_json::to_string(&entry).unwrap_or_else(|e| {
|
||||
eprintln!("failed to serialize: {}", e);
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
extern crate solana;
|
||||
|
||||
use solana::entry::Entry;
|
||||
use solana::event::Event;
|
||||
use solana::hash::Hash;
|
||||
use solana::historian::Historian;
|
||||
use solana::ledger::Block;
|
||||
use solana::recorder::Signal;
|
||||
use solana::signature::{KeyPair, KeyPairUtil};
|
||||
use solana::transaction::Transaction;
|
||||
use std::sync::mpsc::{sync_channel, SendError, SyncSender};
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
fn create_ledger(input: &SyncSender<Signal>, seed: &Hash) -> Result<(), SendError<Signal>> {
|
||||
sleep(Duration::from_millis(15));
|
||||
let keypair = KeyPair::new();
|
||||
let tr = Transaction::new(&keypair, keypair.pubkey(), 42, *seed);
|
||||
let signal0 = Signal::Event(Event::Transaction(tr));
|
||||
input.send(signal0)?;
|
||||
sleep(Duration::from_millis(10));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let (input, event_receiver) = sync_channel(10);
|
||||
let seed = Hash::default();
|
||||
let hist = Historian::new(event_receiver, &seed, Some(10));
|
||||
create_ledger(&input, &seed).expect("send error");
|
||||
drop(input);
|
||||
let entries: Vec<Entry> = hist.output.lock().unwrap().iter().collect();
|
||||
for entry in &entries {
|
||||
println!("{:?}", entry);
|
||||
}
|
||||
// Proof-of-History: Verify the historian learned about the events
|
||||
// in the same order they appear in the vector.
|
||||
assert!(entries[..].verify(&seed));
|
||||
}
|
|
@ -7,19 +7,18 @@ extern crate solana;
|
|||
use getopts::Options;
|
||||
use isatty::stdin_isatty;
|
||||
use solana::accountant::Accountant;
|
||||
use solana::accounting_stage::AccountingStage;
|
||||
use solana::crdt::ReplicatedData;
|
||||
use solana::entry::Entry;
|
||||
use solana::event::Event;
|
||||
use solana::historian::Historian;
|
||||
use solana::signature::{KeyPair, KeyPairUtil};
|
||||
use solana::tpu::Tpu;
|
||||
use std::env;
|
||||
use std::io::{stdin, stdout, Read};
|
||||
use std::net::UdpSocket;
|
||||
use std::process::exit;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::mpsc::sync_channel;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
fn print_usage(program: &str, opts: Options) {
|
||||
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
|
||||
|
@ -55,7 +54,7 @@ fn main() {
|
|||
let serve_addr = format!("0.0.0.0:{}", port);
|
||||
let gossip_addr = format!("0.0.0.0:{}", port + 1);
|
||||
let replicate_addr = format!("0.0.0.0:{}", port + 2);
|
||||
let skinny_addr = format!("0.0.0.0:{}", port + 3);
|
||||
let events_addr = format!("0.0.0.0:{}", port + 3);
|
||||
|
||||
if stdin_isatty() {
|
||||
eprintln!("nothing found on stdin, expected a log file");
|
||||
|
@ -95,35 +94,34 @@ fn main() {
|
|||
|
||||
eprintln!("creating accountant...");
|
||||
|
||||
let acc = Accountant::new_from_deposit(&deposit.unwrap());
|
||||
acc.register_entry_id(&entry0.id);
|
||||
acc.register_entry_id(&entry1.id);
|
||||
let accountant = Accountant::new_from_deposit(&deposit.unwrap());
|
||||
accountant.register_entry_id(&entry0.id);
|
||||
accountant.register_entry_id(&entry1.id);
|
||||
|
||||
eprintln!("processing entries...");
|
||||
|
||||
let mut last_id = entry1.id;
|
||||
for entry in entries {
|
||||
last_id = entry.id;
|
||||
let results = acc.process_verified_events(entry.events);
|
||||
let results = accountant.process_verified_events(entry.events);
|
||||
for result in results {
|
||||
if let Err(e) = result {
|
||||
eprintln!("failed to process event {:?}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
acc.register_entry_id(&last_id);
|
||||
accountant.register_entry_id(&last_id);
|
||||
}
|
||||
|
||||
eprintln!("creating networking stack...");
|
||||
|
||||
let (input, event_receiver) = sync_channel(10_000);
|
||||
let historian = Historian::new(event_receiver, &last_id, Some(1000));
|
||||
let accounting_stage = AccountingStage::new(accountant, &last_id, Some(1000));
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let tpu = Arc::new(Tpu::new(acc, input, historian));
|
||||
let tpu = Arc::new(Tpu::new(accounting_stage));
|
||||
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
|
||||
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
|
||||
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
|
||||
let skinny_sock = UdpSocket::bind(&skinny_addr).unwrap();
|
||||
let events_sock = UdpSocket::bind(&events_addr).unwrap();
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
let d = ReplicatedData::new(
|
||||
pubkey,
|
||||
|
@ -136,7 +134,7 @@ fn main() {
|
|||
&tpu,
|
||||
d,
|
||||
serve_sock,
|
||||
skinny_sock,
|
||||
events_sock,
|
||||
gossip_sock,
|
||||
exit.clone(),
|
||||
stdout(),
|
||||
|
|
|
@ -134,9 +134,9 @@ mod tests {
|
|||
use ecdsa;
|
||||
use packet::{Packet, Packets, SharedPackets};
|
||||
use std::sync::RwLock;
|
||||
use tpu::Request;
|
||||
use transaction::test_tx;
|
||||
use thin_client_service::Request;
|
||||
use transaction::Transaction;
|
||||
use transaction::test_tx;
|
||||
|
||||
fn make_packet_from_transaction(tr: Transaction) -> Packet {
|
||||
let tx = serialize(&Request::Transaction(tr)).unwrap();
|
||||
|
|
16
src/entry.rs
16
src/entry.rs
|
@ -103,12 +103,12 @@ pub fn create_entry_mut(start_hash: &mut Hash, cur_hashes: &mut u64, events: Vec
|
|||
entry
|
||||
}
|
||||
|
||||
/// Creates the next Tick Entry `num_hashes` after `start_hash`.
|
||||
pub fn next_tick(start_hash: &Hash, num_hashes: u64) -> Entry {
|
||||
/// Creates the next Tick or Event Entry `num_hashes` after `start_hash`.
|
||||
pub fn next_entry(start_hash: &Hash, num_hashes: u64, events: Vec<Event>) -> Entry {
|
||||
Entry {
|
||||
num_hashes,
|
||||
id: next_hash(start_hash, num_hashes, &[]),
|
||||
events: vec![],
|
||||
id: next_hash(start_hash, num_hashes, &events),
|
||||
events: events,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,8 +128,8 @@ mod tests {
|
|||
let one = hash(&zero);
|
||||
assert!(Entry::new_tick(0, &zero).verify(&zero)); // base case
|
||||
assert!(!Entry::new_tick(0, &zero).verify(&one)); // base case, bad
|
||||
assert!(next_tick(&zero, 1).verify(&zero)); // inductive step
|
||||
assert!(!next_tick(&zero, 1).verify(&one)); // inductive step, bad
|
||||
assert!(next_entry(&zero, 1, vec![]).verify(&zero)); // inductive step
|
||||
assert!(!next_entry(&zero, 1, vec![]).verify(&one)); // inductive step, bad
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -167,9 +167,9 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn test_next_tick() {
|
||||
fn test_next_entry() {
|
||||
let zero = Hash::default();
|
||||
let tick = next_tick(&zero, 1);
|
||||
let tick = next_entry(&zero, 1, vec![]);
|
||||
assert_eq!(tick.num_hashes, 1);
|
||||
assert_ne!(tick.id, zero);
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! The `hash` module provides functions for creating SHA-256 hashes.
|
||||
|
||||
use generic_array::typenum::U32;
|
||||
use generic_array::GenericArray;
|
||||
use generic_array::typenum::U32;
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
pub type Hash = GenericArray<u8, U32>;
|
||||
|
|
|
@ -4,13 +4,13 @@
|
|||
use entry::Entry;
|
||||
use hash::Hash;
|
||||
use recorder::{ExitReason, Recorder, Signal};
|
||||
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Mutex;
|
||||
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
|
||||
use std::thread::{spawn, JoinHandle};
|
||||
use std::time::Instant;
|
||||
|
||||
pub struct Historian {
|
||||
pub output: Arc<Mutex<Receiver<Entry>>>,
|
||||
pub output: Mutex<Receiver<Entry>>,
|
||||
pub thread_hdl: JoinHandle<ExitReason>,
|
||||
}
|
||||
|
||||
|
@ -20,12 +20,11 @@ impl Historian {
|
|||
start_hash: &Hash,
|
||||
ms_per_tick: Option<u64>,
|
||||
) -> Self {
|
||||
let (entry_sender, output) = sync_channel(10_000);
|
||||
let (entry_sender, output) = channel();
|
||||
let thread_hdl =
|
||||
Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender);
|
||||
let loutput = Arc::new(Mutex::new(output));
|
||||
Historian {
|
||||
output: loutput,
|
||||
output: Mutex::new(output),
|
||||
thread_hdl,
|
||||
}
|
||||
}
|
||||
|
@ -36,7 +35,7 @@ impl Historian {
|
|||
start_hash: Hash,
|
||||
ms_per_tick: Option<u64>,
|
||||
receiver: Receiver<Signal>,
|
||||
sender: SyncSender<Entry>,
|
||||
sender: Sender<Entry>,
|
||||
) -> JoinHandle<ExitReason> {
|
||||
spawn(move || {
|
||||
let mut recorder = Recorder::new(receiver, sender, start_hash);
|
||||
|
@ -66,7 +65,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_historian() {
|
||||
let (input, event_receiver) = sync_channel(10);
|
||||
let (input, event_receiver) = channel();
|
||||
let zero = Hash::default();
|
||||
let hist = Historian::new(event_receiver, &zero, None);
|
||||
|
||||
|
@ -95,7 +94,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_historian_closed_sender() {
|
||||
let (input, event_receiver) = sync_channel(10);
|
||||
let (input, event_receiver) = channel();
|
||||
let zero = Hash::default();
|
||||
let hist = Historian::new(event_receiver, &zero, None);
|
||||
drop(hist.output);
|
||||
|
@ -108,7 +107,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_ticking_historian() {
|
||||
let (input, event_receiver) = sync_channel(10);
|
||||
let (input, event_receiver) = channel();
|
||||
let zero = Hash::default();
|
||||
let hist = Historian::new(event_receiver, &zero, Some(20));
|
||||
sleep(Duration::from_millis(300));
|
||||
|
|
155
src/ledger.rs
155
src/ledger.rs
|
@ -1,9 +1,17 @@
|
|||
//! The `ledger` module provides functions for parallel verification of the
|
||||
//! Proof of History ledger.
|
||||
|
||||
use entry::{next_tick, Entry};
|
||||
use bincode::{deserialize, serialize_into};
|
||||
use entry::{next_entry, Entry};
|
||||
use event::Event;
|
||||
use hash::Hash;
|
||||
use packet;
|
||||
use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE};
|
||||
use rayon::prelude::*;
|
||||
use std::cmp::min;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::Cursor;
|
||||
use std::mem::size_of;
|
||||
|
||||
pub trait Block {
|
||||
/// Verifies the hashes and counts of a slice of events are all consistent.
|
||||
|
@ -18,22 +26,108 @@ impl Block for [Entry] {
|
|||
}
|
||||
}
|
||||
|
||||
/// Create a vector of Ticks of length `len` from `start_hash` hash and `num_hashes`.
|
||||
pub fn next_ticks(start_hash: &Hash, num_hashes: u64, len: usize) -> Vec<Entry> {
|
||||
/// Create a vector of Entries of length `event_set.len()` from `start_hash` hash, `num_hashes`, and `event_set`.
|
||||
pub fn next_entries(start_hash: &Hash, num_hashes: u64, event_set: Vec<Vec<Event>>) -> Vec<Entry> {
|
||||
let mut id = *start_hash;
|
||||
let mut ticks = vec![];
|
||||
for _ in 0..len {
|
||||
let entry = next_tick(&id, num_hashes);
|
||||
let mut entries = vec![];
|
||||
for event_list in &event_set {
|
||||
let events = event_list.clone();
|
||||
let entry = next_entry(&id, num_hashes, events);
|
||||
id = entry.id;
|
||||
ticks.push(entry);
|
||||
entries.push(entry);
|
||||
}
|
||||
ticks
|
||||
entries
|
||||
}
|
||||
|
||||
pub fn process_entry_list_into_blobs(
|
||||
list: &Vec<Entry>,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
q: &mut VecDeque<SharedBlob>,
|
||||
) {
|
||||
let mut start = 0;
|
||||
let mut end = 0;
|
||||
while start < list.len() {
|
||||
let mut entries: Vec<Vec<Entry>> = Vec::new();
|
||||
let mut total = 0;
|
||||
for i in &list[start..] {
|
||||
total += size_of::<Event>() * i.events.len();
|
||||
total += size_of::<Entry>();
|
||||
if total >= BLOB_DATA_SIZE {
|
||||
break;
|
||||
}
|
||||
end += 1;
|
||||
}
|
||||
// See if we need to split the events
|
||||
if end <= start {
|
||||
let mut event_start = 0;
|
||||
let num_events_per_blob = BLOB_DATA_SIZE / size_of::<Event>();
|
||||
let total_entry_chunks =
|
||||
(list[end].events.len() + num_events_per_blob - 1) / num_events_per_blob;
|
||||
trace!(
|
||||
"splitting events end: {} total_chunks: {}",
|
||||
end,
|
||||
total_entry_chunks
|
||||
);
|
||||
for _ in 0..total_entry_chunks {
|
||||
let event_end = min(event_start + num_events_per_blob, list[end].events.len());
|
||||
let mut entry = Entry {
|
||||
num_hashes: list[end].num_hashes,
|
||||
id: list[end].id,
|
||||
events: list[end].events[event_start..event_end].to_vec(),
|
||||
};
|
||||
entries.push(vec![entry]);
|
||||
event_start = event_end;
|
||||
}
|
||||
end += 1;
|
||||
} else {
|
||||
entries.push(list[start..end].to_vec());
|
||||
}
|
||||
|
||||
for entry in entries {
|
||||
let b = blob_recycler.allocate();
|
||||
let pos = {
|
||||
let mut bd = b.write().unwrap();
|
||||
let mut out = Cursor::new(bd.data_mut());
|
||||
serialize_into(&mut out, &entry).expect("failed to serialize output");
|
||||
out.position() as usize
|
||||
};
|
||||
assert!(pos < BLOB_SIZE);
|
||||
b.write().unwrap().set_size(pos);
|
||||
q.push_back(b);
|
||||
}
|
||||
start = end;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reconstruct_entries_from_blobs(blobs: &VecDeque<SharedBlob>) -> Vec<Entry> {
|
||||
let mut entries_to_apply: Vec<Entry> = Vec::new();
|
||||
let mut last_id = Hash::default();
|
||||
for msgs in blobs {
|
||||
let blob = msgs.read().unwrap();
|
||||
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
|
||||
for entry in entries {
|
||||
if entry.id == last_id {
|
||||
if let Some(last_entry) = entries_to_apply.last_mut() {
|
||||
last_entry.events.extend(entry.events);
|
||||
}
|
||||
} else {
|
||||
last_id = entry.id;
|
||||
entries_to_apply.push(entry);
|
||||
}
|
||||
}
|
||||
//TODO respond back to leader with hash of the state
|
||||
}
|
||||
entries_to_apply
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use entry;
|
||||
use hash::hash;
|
||||
use packet::BlobRecycler;
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use transaction::Transaction;
|
||||
|
||||
#[test]
|
||||
fn test_verify_slice() {
|
||||
|
@ -42,12 +136,51 @@ mod tests {
|
|||
assert!(vec![][..].verify(&zero)); // base case
|
||||
assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero)); // singleton case 1
|
||||
assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one)); // singleton case 2, bad
|
||||
assert!(next_ticks(&zero, 0, 2)[..].verify(&zero)); // inductive step
|
||||
assert!(next_entries(&zero, 0, vec![vec![]; 2])[..].verify(&zero)); // inductive step
|
||||
|
||||
let mut bad_ticks = next_ticks(&zero, 0, 2);
|
||||
let mut bad_ticks = next_entries(&zero, 0, vec![vec![]; 2]);
|
||||
bad_ticks[1].id = one;
|
||||
assert!(!bad_ticks.verify(&zero)); // inductive step, bad
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_entry_to_blobs() {
|
||||
let zero = Hash::default();
|
||||
let one = hash(&zero);
|
||||
let keypair = KeyPair::new();
|
||||
let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, one));
|
||||
let events = vec![tr0.clone(); 10000];
|
||||
let e0 = entry::create_entry(&zero, 0, events);
|
||||
|
||||
let entry_list = vec![e0.clone(); 1];
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
let mut blob_q = VecDeque::new();
|
||||
process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
|
||||
let entries = reconstruct_entries_from_blobs(&blob_q);
|
||||
|
||||
assert_eq!(entry_list, entries);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_next_entries() {
|
||||
let mut id = Hash::default();
|
||||
let next_id = hash(&id);
|
||||
let keypair = KeyPair::new();
|
||||
let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, next_id));
|
||||
let events = vec![tr0.clone(); 5];
|
||||
let event_set = vec![events.clone(); 5];
|
||||
let entries0 = next_entries(&id, 0, event_set);
|
||||
|
||||
assert_eq!(entries0.len(), 5);
|
||||
|
||||
let mut entries1 = vec![];
|
||||
for _ in 0..5 {
|
||||
let entry = next_entry(&id, 0, events.clone());
|
||||
id = entry.id;
|
||||
entries1.push(entry);
|
||||
}
|
||||
assert_eq!(entries0, entries1);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "unstable", test))]
|
||||
|
@ -59,7 +192,7 @@ mod bench {
|
|||
#[bench]
|
||||
fn event_bench(bencher: &mut Bencher) {
|
||||
let start_hash = Hash::default();
|
||||
let entries = next_ticks(&start_hash, 10_000, 8);
|
||||
let entries = next_entries(&start_hash, 10_000, vec![vec![]; 8]);
|
||||
bencher.iter(|| {
|
||||
assert!(entries.verify(&start_hash));
|
||||
});
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
#![cfg_attr(feature = "unstable", feature(test))]
|
||||
pub mod accountant;
|
||||
pub mod accounting_stage;
|
||||
pub mod crdt;
|
||||
pub mod ecdsa;
|
||||
pub mod entry;
|
||||
|
@ -18,6 +19,7 @@ pub mod result;
|
|||
pub mod signature;
|
||||
pub mod streamer;
|
||||
pub mod thin_client;
|
||||
pub mod thin_client_service;
|
||||
pub mod timing;
|
||||
pub mod tpu;
|
||||
pub mod transaction;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! The `mint` module is a library for generating the chain's genesis block.
|
||||
|
||||
use entry::create_entry;
|
||||
use entry::Entry;
|
||||
use entry::create_entry;
|
||||
use event::Event;
|
||||
use hash::{hash, Hash};
|
||||
use ring::rand::SystemRandom;
|
||||
|
|
|
@ -8,15 +8,13 @@
|
|||
use entry::{create_entry_mut, Entry};
|
||||
use event::Event;
|
||||
use hash::{hash, Hash};
|
||||
use packet::BLOB_DATA_SIZE;
|
||||
use std::mem;
|
||||
use std::sync::mpsc::{Receiver, SyncSender, TryRecvError};
|
||||
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
|
||||
pub enum Signal {
|
||||
Tick,
|
||||
Event(Event),
|
||||
Events(Vec<Event>),
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
|
@ -26,21 +24,19 @@ pub enum ExitReason {
|
|||
}
|
||||
|
||||
pub struct Recorder {
|
||||
sender: SyncSender<Entry>,
|
||||
sender: Sender<Entry>,
|
||||
receiver: Receiver<Signal>,
|
||||
last_hash: Hash,
|
||||
events: Vec<Event>,
|
||||
num_hashes: u64,
|
||||
num_ticks: u64,
|
||||
}
|
||||
|
||||
impl Recorder {
|
||||
pub fn new(receiver: Receiver<Signal>, sender: SyncSender<Entry>, last_hash: Hash) -> Self {
|
||||
pub fn new(receiver: Receiver<Signal>, sender: Sender<Entry>, last_hash: Hash) -> Self {
|
||||
Recorder {
|
||||
receiver,
|
||||
sender,
|
||||
last_hash,
|
||||
events: vec![],
|
||||
num_hashes: 0,
|
||||
num_ticks: 0,
|
||||
}
|
||||
|
@ -51,8 +47,7 @@ impl Recorder {
|
|||
self.num_hashes += 1;
|
||||
}
|
||||
|
||||
pub fn record_entry(&mut self) -> Result<(), ExitReason> {
|
||||
let events = mem::replace(&mut self.events, vec![]);
|
||||
pub fn record_entry(&mut self, events: Vec<Event>) -> Result<(), ExitReason> {
|
||||
let entry = create_entry_mut(&mut self.last_hash, &mut self.num_hashes, events);
|
||||
self.sender
|
||||
.send(entry)
|
||||
|
@ -68,7 +63,7 @@ impl Recorder {
|
|||
loop {
|
||||
if let Some(ms) = ms_per_tick {
|
||||
if epoch.elapsed() > Duration::from_millis((self.num_ticks + 1) * ms) {
|
||||
self.record_entry()?;
|
||||
self.record_entry(vec![])?;
|
||||
self.num_ticks += 1;
|
||||
}
|
||||
}
|
||||
|
@ -76,17 +71,10 @@ impl Recorder {
|
|||
match self.receiver.try_recv() {
|
||||
Ok(signal) => match signal {
|
||||
Signal::Tick => {
|
||||
self.record_entry()?;
|
||||
self.record_entry(vec![])?;
|
||||
}
|
||||
Signal::Event(event) => {
|
||||
self.events.push(event);
|
||||
|
||||
// Record an entry early if we anticipate its serialized size will
|
||||
// be larger than 64kb. At the time of this writing, we assume each
|
||||
// event will be well under 256 bytes.
|
||||
if self.events.len() >= BLOB_DATA_SIZE / 256 {
|
||||
self.record_entry()?;
|
||||
}
|
||||
Signal::Events(events) => {
|
||||
self.record_entry(events)?;
|
||||
}
|
||||
},
|
||||
Err(TryRecvError::Empty) => return Ok(()),
|
||||
|
@ -99,30 +87,27 @@ impl Recorder {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use bincode::serialize;
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use std::sync::mpsc::sync_channel;
|
||||
use std::sync::mpsc::channel;
|
||||
use transaction::Transaction;
|
||||
|
||||
#[test]
|
||||
fn test_sub64k_entry_size() {
|
||||
let (signal_sender, signal_receiver) = sync_channel(500);
|
||||
let (entry_sender, entry_receiver) = sync_channel(10);
|
||||
fn test_events() {
|
||||
let (signal_sender, signal_receiver) = channel();
|
||||
let (entry_sender, entry_receiver) = channel();
|
||||
let zero = Hash::default();
|
||||
let mut recorder = Recorder::new(signal_receiver, entry_sender, zero);
|
||||
let alice_keypair = KeyPair::new();
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
for _ in 0..256 {
|
||||
let tx = Transaction::new(&alice_keypair, bob_pubkey, 1, zero);
|
||||
let event = Event::Transaction(tx);
|
||||
signal_sender.send(Signal::Event(event)).unwrap();
|
||||
}
|
||||
|
||||
let event0 = Event::Transaction(Transaction::new(&alice_keypair, bob_pubkey, 1, zero));
|
||||
let event1 = Event::Transaction(Transaction::new(&alice_keypair, bob_pubkey, 2, zero));
|
||||
signal_sender
|
||||
.send(Signal::Events(vec![event0, event1]))
|
||||
.unwrap();
|
||||
recorder.process_events(Instant::now(), None).unwrap();
|
||||
|
||||
drop(recorder.sender);
|
||||
let entries: Vec<_> = entry_receiver.iter().collect();
|
||||
assert_eq!(entries.len(), 1);
|
||||
assert!(serialize(&entries[0]).unwrap().len() <= 65_536);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,9 +78,9 @@ mod tests {
|
|||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::mpsc::RecvError;
|
||||
use std::sync::mpsc::RecvTimeoutError;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::thread;
|
||||
|
||||
fn addr_parse_error() -> Result<SocketAddr> {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! The `signature` module provides functionality for public, and private keys.
|
||||
|
||||
use generic_array::typenum::{U32, U64};
|
||||
use generic_array::GenericArray;
|
||||
use generic_array::typenum::{U32, U64};
|
||||
use rand::{ChaChaRng, Rng, SeedableRng};
|
||||
use rayon::prelude::*;
|
||||
use ring::error::Unspecified;
|
||||
|
|
|
@ -64,6 +64,25 @@ fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Res
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let msgs = recvr.recv_timeout(timer)?;
|
||||
debug!("got msgs");
|
||||
let mut len = msgs.read().unwrap().packets.len();
|
||||
let mut batch = vec![msgs];
|
||||
while let Ok(more) = recvr.try_recv() {
|
||||
trace!("got more msgs");
|
||||
len += more.read().unwrap().packets.len();
|
||||
batch.push(more);
|
||||
|
||||
if len > 100_000 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
debug!("batch len {}", batch.len());
|
||||
Ok((batch, len))
|
||||
}
|
||||
|
||||
pub fn responder(
|
||||
sock: UdpSocket,
|
||||
exit: Arc<AtomicBool>,
|
||||
|
@ -438,8 +457,8 @@ mod test {
|
|||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use streamer::{blob_receiver, receiver, responder, retransmitter, window};
|
||||
use streamer::{BlobReceiver, PacketReceiver};
|
||||
use streamer::{blob_receiver, receiver, responder, retransmitter, window};
|
||||
|
||||
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
||||
for _t in 0..5 {
|
||||
|
@ -594,6 +613,7 @@ mod test {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
//retransmit from leader to replicate target
|
||||
pub fn retransmit() {
|
||||
logger::setup();
|
||||
|
|
|
@ -10,7 +10,7 @@ use signature::{KeyPair, PublicKey, Signature};
|
|||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use tpu::{Request, Response, Subscription};
|
||||
use thin_client_service::{Request, Response, Subscription};
|
||||
use transaction::Transaction;
|
||||
|
||||
pub struct ThinClient {
|
||||
|
@ -148,28 +148,27 @@ impl ThinClient {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use accountant::Accountant;
|
||||
use accounting_stage::AccountingStage;
|
||||
use crdt::{Crdt, ReplicatedData};
|
||||
use futures::Future;
|
||||
use historian::Historian;
|
||||
use logger;
|
||||
use mint::Mint;
|
||||
use plan::Plan;
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use std::io::sink;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::sync_channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tpu::Tpu;
|
||||
use tpu::{self, Tpu};
|
||||
|
||||
// TODO: Figure out why this test sometimes hangs on TravisCI.
|
||||
#[test]
|
||||
fn test_thin_client() {
|
||||
logger::setup();
|
||||
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let skinny = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let addr = serve.local_addr().unwrap();
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
let d = ReplicatedData::new(
|
||||
|
@ -180,25 +179,33 @@ mod tests {
|
|||
);
|
||||
|
||||
let alice = Mint::new(10_000);
|
||||
let acc = Accountant::new(&alice);
|
||||
let accountant = Accountant::new(&alice);
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (input, event_receiver) = sync_channel(10);
|
||||
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||
let acc = Arc::new(Tpu::new(acc, input, historian));
|
||||
let threads = Tpu::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap();
|
||||
let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
|
||||
let accountant = Arc::new(Tpu::new(accounting_stage));
|
||||
let threads = Tpu::serve(
|
||||
&accountant,
|
||||
d,
|
||||
serve,
|
||||
events_socket,
|
||||
gossip,
|
||||
exit.clone(),
|
||||
sink(),
|
||||
).unwrap();
|
||||
sleep(Duration::from_millis(300));
|
||||
|
||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
|
||||
let mut acc = ThinClient::new(addr, socket);
|
||||
let last_id = acc.get_last_id().wait().unwrap();
|
||||
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
|
||||
let mut accountant = ThinClient::new(addr, socket);
|
||||
let last_id = accountant.get_last_id().wait().unwrap();
|
||||
let _sig = accountant
|
||||
.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
|
||||
.unwrap();
|
||||
let mut balance;
|
||||
let now = Instant::now();
|
||||
loop {
|
||||
balance = acc.get_balance(&bob_pubkey);
|
||||
balance = accountant.get_balance(&bob_pubkey);
|
||||
if balance.is_ok() {
|
||||
break;
|
||||
}
|
||||
|
@ -213,10 +220,58 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bad_sig() {
|
||||
let (leader_data, leader_gossip, _, leader_serve, leader_events) = tpu::test_node();
|
||||
let alice = Mint::new(10_000);
|
||||
let accountant = Accountant::new(&alice);
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
|
||||
let tpu = Arc::new(Tpu::new(accounting_stage));
|
||||
let serve_addr = leader_serve.local_addr().unwrap();
|
||||
let threads = Tpu::serve(
|
||||
&tpu,
|
||||
leader_data,
|
||||
leader_serve,
|
||||
leader_events,
|
||||
leader_gossip,
|
||||
exit.clone(),
|
||||
sink(),
|
||||
).unwrap();
|
||||
sleep(Duration::from_millis(300));
|
||||
|
||||
let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
|
||||
let mut client = ThinClient::new(serve_addr, socket);
|
||||
let last_id = client.get_last_id().wait().unwrap();
|
||||
|
||||
trace!("doing stuff");
|
||||
|
||||
let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id);
|
||||
|
||||
let _sig = client.transfer_signed(tr).unwrap();
|
||||
|
||||
let last_id = client.get_last_id().wait().unwrap();
|
||||
|
||||
let mut tr2 = Transaction::new(&alice.keypair(), bob_pubkey, 501, last_id);
|
||||
tr2.data.tokens = 502;
|
||||
tr2.data.plan = Plan::new_payment(502, bob_pubkey);
|
||||
let _sig = client.transfer_signed(tr2).unwrap();
|
||||
|
||||
assert_eq!(client.get_balance(&bob_pubkey).unwrap(), 500);
|
||||
trace!("exiting");
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
trace!("joining threads");
|
||||
for t in threads {
|
||||
t.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
|
||||
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let skinny = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
let leader = ReplicatedData::new(
|
||||
|
@ -225,10 +280,11 @@ mod tests {
|
|||
replicate.local_addr().unwrap(),
|
||||
serve.local_addr().unwrap(),
|
||||
);
|
||||
(leader, gossip, serve, replicate, skinny)
|
||||
(leader, gossip, serve, replicate, events_socket)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_multi_node() {
|
||||
logger::setup();
|
||||
info!("test_multi_node");
|
||||
|
@ -239,17 +295,15 @@ mod tests {
|
|||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let leader_acc = {
|
||||
let (input, event_receiver) = sync_channel(10);
|
||||
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||
let acc = Accountant::new(&alice);
|
||||
Arc::new(Tpu::new(acc, input, historian))
|
||||
let accountant = Accountant::new(&alice);
|
||||
let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
|
||||
Arc::new(Tpu::new(accounting_stage))
|
||||
};
|
||||
|
||||
let replicant_acc = {
|
||||
let (input, event_receiver) = sync_channel(10);
|
||||
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||
let acc = Accountant::new(&alice);
|
||||
Arc::new(Tpu::new(acc, input, historian))
|
||||
let accountant = Accountant::new(&alice);
|
||||
let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
|
||||
Arc::new(Tpu::new(accounting_stage))
|
||||
};
|
||||
|
||||
let leader_threads = Tpu::serve(
|
||||
|
@ -313,14 +367,15 @@ mod tests {
|
|||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
||||
|
||||
let mut acc = ThinClient::new(leader.0.serve_addr, socket);
|
||||
let mut accountant = ThinClient::new(leader.0.serve_addr, socket);
|
||||
info!("getting leader last_id");
|
||||
let last_id = acc.get_last_id().wait().unwrap();
|
||||
let last_id = accountant.get_last_id().wait().unwrap();
|
||||
info!("executing leader transer");
|
||||
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
|
||||
let _sig = accountant
|
||||
.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
|
||||
.unwrap();
|
||||
info!("getting leader balance");
|
||||
acc.get_balance(&bob_pubkey).unwrap()
|
||||
accountant.get_balance(&bob_pubkey).unwrap()
|
||||
};
|
||||
assert_eq!(leader_balance, 500);
|
||||
//verify replicant has the same balance
|
||||
|
@ -329,9 +384,9 @@ mod tests {
|
|||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
||||
|
||||
let mut acc = ThinClient::new(replicant.0.serve_addr, socket);
|
||||
let mut accountant = ThinClient::new(replicant.0.serve_addr, socket);
|
||||
info!("getting replicant balance");
|
||||
if let Ok(bal) = acc.get_balance(&bob_pubkey) {
|
||||
if let Ok(bal) = accountant.get_balance(&bob_pubkey) {
|
||||
replicant_balance = bal;
|
||||
}
|
||||
info!("replicant balance {}", replicant_balance);
|
||||
|
|
|
@ -0,0 +1,334 @@
|
|||
//! The `thin_client_service` sits alongside the TPU and queries it for information
|
||||
//! on behalf of thing clients.
|
||||
|
||||
use accountant::Accountant;
|
||||
use accounting_stage::AccountingStage;
|
||||
use bincode::{deserialize, serialize};
|
||||
use entry::Entry;
|
||||
use event::Event;
|
||||
use hash::Hash;
|
||||
use packet;
|
||||
use packet::SharedPackets;
|
||||
use rayon::prelude::*;
|
||||
use result::Result;
|
||||
use signature::PublicKey;
|
||||
use std::collections::VecDeque;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use transaction::Transaction;
|
||||
//use std::io::{Cursor, Write};
|
||||
//use std::sync::atomic::{AtomicBool, Ordering};
|
||||
//use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
use std::sync::mpsc::Receiver;
|
||||
use std::sync::{Arc, Mutex};
|
||||
//use std::thread::{spawn, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use streamer;
|
||||
use timing;
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub enum Request {
|
||||
Transaction(Transaction),
|
||||
GetBalance { key: PublicKey },
|
||||
Subscribe { subscriptions: Vec<Subscription> },
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub enum Subscription {
|
||||
EntryInfo,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct EntryInfo {
|
||||
pub id: Hash,
|
||||
pub num_hashes: u64,
|
||||
pub num_events: u64,
|
||||
}
|
||||
|
||||
impl Request {
|
||||
/// Verify the request is valid.
|
||||
pub fn verify(&self) -> bool {
|
||||
match *self {
|
||||
Request::Transaction(ref tr) => tr.verify_plan(),
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub enum Response {
|
||||
Balance { key: PublicKey, val: Option<i64> },
|
||||
EntryInfo(EntryInfo),
|
||||
}
|
||||
|
||||
pub struct ThinClientService {
|
||||
//pub output: Mutex<Receiver<Response>>,
|
||||
//response_sender: Mutex<Sender<Response>>,
|
||||
accountant: Arc<Accountant>,
|
||||
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
|
||||
}
|
||||
|
||||
impl ThinClientService {
|
||||
/// Create a new Tpu that wraps the given Accountant.
|
||||
pub fn new(accountant: Arc<Accountant>) -> Self {
|
||||
//let (response_sender, output) = channel();
|
||||
ThinClientService {
|
||||
//output: Mutex::new(output),
|
||||
//response_sender: Mutex::new(response_sender),
|
||||
accountant,
|
||||
entry_info_subscribers: Mutex::new(vec![]),
|
||||
}
|
||||
}
|
||||
|
||||
/// Process Request items sent by clients.
|
||||
fn process_request(
|
||||
&self,
|
||||
msg: Request,
|
||||
rsp_addr: SocketAddr,
|
||||
) -> Option<(Response, SocketAddr)> {
|
||||
match msg {
|
||||
Request::GetBalance { key } => {
|
||||
let val = self.accountant.get_balance(&key);
|
||||
let rsp = (Response::Balance { key, val }, rsp_addr);
|
||||
info!("Response::Balance {:?}", rsp);
|
||||
Some(rsp)
|
||||
}
|
||||
Request::Transaction(_) => unreachable!(),
|
||||
Request::Subscribe { subscriptions } => {
|
||||
for subscription in subscriptions {
|
||||
match subscription {
|
||||
Subscription::EntryInfo => {
|
||||
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn process_requests(
|
||||
&self,
|
||||
reqs: Vec<(Request, SocketAddr)>,
|
||||
) -> Vec<(Response, SocketAddr)> {
|
||||
reqs.into_iter()
|
||||
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn notify_entry_info_subscribers(&self, entry: &Entry) {
|
||||
// TODO: No need to bind().
|
||||
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
|
||||
|
||||
// copy subscribers to avoid taking lock while doing io
|
||||
let addrs = self.entry_info_subscribers.lock().unwrap().clone();
|
||||
trace!("Sending to {} addrs", addrs.len());
|
||||
for addr in addrs {
|
||||
let entry_info = EntryInfo {
|
||||
id: entry.id,
|
||||
num_hashes: entry.num_hashes,
|
||||
num_events: entry.events.len() as u64,
|
||||
};
|
||||
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
|
||||
trace!("sending {} to {}", data.len(), addr);
|
||||
//TODO dont do IO here, this needs to be on a separate channel
|
||||
let res = socket.send_to(&data, addr);
|
||||
if res.is_err() {
|
||||
eprintln!("couldn't send response: {:?}", res);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize_requests(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> {
|
||||
p.packets
|
||||
.par_iter()
|
||||
.map(|x| {
|
||||
deserialize(&x.data[0..x.meta.size])
|
||||
.map(|req| (req, x.meta.addr()))
|
||||
.ok()
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
// Copy-paste of deserialize_requests() because I can't figure out how to
|
||||
// route the lifetimes in a generic version.
|
||||
pub fn deserialize_events(p: &packet::Packets) -> Vec<Option<(Event, SocketAddr)>> {
|
||||
p.packets
|
||||
.par_iter()
|
||||
.map(|x| {
|
||||
deserialize(&x.data[0..x.meta.size])
|
||||
.map(|req| (req, x.meta.addr()))
|
||||
.ok()
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Split Request list into verified transactions and the rest
|
||||
fn partition_requests(
|
||||
req_vers: Vec<(Request, SocketAddr, u8)>,
|
||||
) -> (Vec<Event>, Vec<(Request, SocketAddr)>) {
|
||||
let mut events = vec![];
|
||||
let mut reqs = vec![];
|
||||
for (msg, rsp_addr, verify) in req_vers {
|
||||
match msg {
|
||||
Request::Transaction(tr) => {
|
||||
if verify != 0 {
|
||||
events.push(Event::Transaction(tr));
|
||||
}
|
||||
}
|
||||
_ => reqs.push((msg, rsp_addr)),
|
||||
}
|
||||
}
|
||||
(events, reqs)
|
||||
}
|
||||
|
||||
fn serialize_response(
|
||||
resp: Response,
|
||||
rsp_addr: SocketAddr,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
) -> Result<packet::SharedBlob> {
|
||||
let blob = blob_recycler.allocate();
|
||||
{
|
||||
let mut b = blob.write().unwrap();
|
||||
let v = serialize(&resp)?;
|
||||
let len = v.len();
|
||||
b.data[..len].copy_from_slice(&v);
|
||||
b.meta.size = len;
|
||||
b.meta.set_addr(&rsp_addr);
|
||||
}
|
||||
Ok(blob)
|
||||
}
|
||||
|
||||
fn serialize_responses(
|
||||
rsps: Vec<(Response, SocketAddr)>,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
) -> Result<VecDeque<packet::SharedBlob>> {
|
||||
let mut blobs = VecDeque::new();
|
||||
for (resp, rsp_addr) in rsps {
|
||||
blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?);
|
||||
}
|
||||
Ok(blobs)
|
||||
}
|
||||
|
||||
pub fn process_request_packets(
|
||||
&self,
|
||||
accounting_stage: &AccountingStage,
|
||||
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
|
||||
responder_sender: &streamer::BlobSender,
|
||||
packet_recycler: &packet::PacketRecycler,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let recv_start = Instant::now();
|
||||
let mms = verified_receiver.recv_timeout(timer)?;
|
||||
let mut reqs_len = 0;
|
||||
let mms_len = mms.len();
|
||||
info!(
|
||||
"@{:?} process start stalled for: {:?}ms batches: {}",
|
||||
timing::timestamp(),
|
||||
timing::duration_as_ms(&recv_start.elapsed()),
|
||||
mms.len(),
|
||||
);
|
||||
let proc_start = Instant::now();
|
||||
for (msgs, vers) in mms {
|
||||
let reqs = Self::deserialize_requests(&msgs.read().unwrap());
|
||||
reqs_len += reqs.len();
|
||||
let req_vers = reqs.into_iter()
|
||||
.zip(vers)
|
||||
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
|
||||
.filter(|x| {
|
||||
let v = x.0.verify();
|
||||
v
|
||||
})
|
||||
.collect();
|
||||
|
||||
debug!("partitioning");
|
||||
let (events, reqs) = Self::partition_requests(req_vers);
|
||||
debug!("events: {} reqs: {}", events.len(), reqs.len());
|
||||
|
||||
debug!("process_events");
|
||||
accounting_stage.process_events(events)?;
|
||||
debug!("done process_events");
|
||||
|
||||
debug!("process_requests");
|
||||
let rsps = self.process_requests(reqs);
|
||||
debug!("done process_requests");
|
||||
|
||||
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
||||
if !blobs.is_empty() {
|
||||
info!("process: sending blobs: {}", blobs.len());
|
||||
//don't wake up the other side if there is nothing
|
||||
responder_sender.send(blobs)?;
|
||||
}
|
||||
packet_recycler.recycle(msgs);
|
||||
}
|
||||
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
|
||||
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
|
||||
info!(
|
||||
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
|
||||
timing::timestamp(),
|
||||
mms_len,
|
||||
total_time_ms,
|
||||
reqs_len,
|
||||
(reqs_len as f32) / (total_time_s)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedPackets> {
|
||||
let mut out = vec![];
|
||||
for rrs in reqs.chunks(packet::NUM_PACKETS) {
|
||||
let p = r.allocate();
|
||||
p.write()
|
||||
.unwrap()
|
||||
.packets
|
||||
.resize(rrs.len(), Default::default());
|
||||
for (i, o) in rrs.iter().zip(p.write().unwrap().packets.iter_mut()) {
|
||||
let v = serialize(&i).expect("serialize request");
|
||||
let len = v.len();
|
||||
o.data[..len].copy_from_slice(&v);
|
||||
o.meta.size = len;
|
||||
}
|
||||
out.push(p);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use bincode::serialize;
|
||||
use ecdsa;
|
||||
use packet::{PacketRecycler, NUM_PACKETS};
|
||||
use thin_client_service::{to_request_packets, Request};
|
||||
use transaction::{memfind, test_tx};
|
||||
|
||||
#[test]
|
||||
fn test_layout() {
|
||||
let tr = test_tx();
|
||||
let tx = serialize(&tr).unwrap();
|
||||
let packet = serialize(&Request::Transaction(tr)).unwrap();
|
||||
assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET));
|
||||
assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_to_packets() {
|
||||
let tr = Request::Transaction(test_tx());
|
||||
let re = PacketRecycler::default();
|
||||
let rv = to_request_packets(&re, vec![tr.clone(); 1]);
|
||||
assert_eq!(rv.len(), 1);
|
||||
assert_eq!(rv[0].read().unwrap().packets.len(), 1);
|
||||
|
||||
let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS]);
|
||||
assert_eq!(rv.len(), 1);
|
||||
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
|
||||
|
||||
let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]);
|
||||
assert_eq!(rv.len(), 2);
|
||||
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
|
||||
assert_eq!(rv[1].read().unwrap().packets.len(), 1);
|
||||
}
|
||||
}
|
734
src/tpu.rs
734
src/tpu.rs
|
@ -1,195 +1,90 @@
|
|||
//! The `tpu` module implements the Transaction Processing Unit, a
|
||||
//! 5-stage transaction processing pipeline in software.
|
||||
|
||||
use accountant::Accountant;
|
||||
use bincode::{deserialize, serialize, serialize_into};
|
||||
use accounting_stage::AccountingStage;
|
||||
use crdt::{Crdt, ReplicatedData};
|
||||
use ecdsa;
|
||||
use entry::Entry;
|
||||
use event::Event;
|
||||
use hash::Hash;
|
||||
use historian::Historian;
|
||||
use ledger;
|
||||
use packet;
|
||||
use packet::{SharedBlob, SharedPackets, BLOB_SIZE};
|
||||
use packet::SharedPackets;
|
||||
use rand::{thread_rng, Rng};
|
||||
use rayon::prelude::*;
|
||||
use recorder::Signal;
|
||||
use result::Result;
|
||||
use serde_json;
|
||||
use signature::PublicKey;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::Write;
|
||||
use std::io::sink;
|
||||
use std::io::{Cursor, Write};
|
||||
use std::mem::size_of;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{channel, Receiver, Sender, SyncSender};
|
||||
use std::sync::mpsc::{channel, Sender};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::thread::{spawn, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use streamer;
|
||||
use thin_client_service::ThinClientService;
|
||||
use timing;
|
||||
use transaction::Transaction;
|
||||
|
||||
pub struct Tpu {
|
||||
acc: Mutex<Accountant>,
|
||||
historian_input: Mutex<SyncSender<Signal>>,
|
||||
historian: Historian,
|
||||
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
|
||||
}
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub enum Request {
|
||||
Transaction(Transaction),
|
||||
GetBalance { key: PublicKey },
|
||||
Subscribe { subscriptions: Vec<Subscription> },
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub enum Subscription {
|
||||
EntryInfo,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct EntryInfo {
|
||||
pub id: Hash,
|
||||
pub num_hashes: u64,
|
||||
pub num_events: u64,
|
||||
}
|
||||
|
||||
impl Request {
|
||||
/// Verify the request is valid.
|
||||
pub fn verify(&self) -> bool {
|
||||
match *self {
|
||||
Request::Transaction(ref tr) => tr.verify_plan(),
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
accounting_stage: AccountingStage,
|
||||
thin_client_service: ThinClientService,
|
||||
}
|
||||
|
||||
type SharedTpu = Arc<Tpu>;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub enum Response {
|
||||
Balance { key: PublicKey, val: Option<i64> },
|
||||
EntryInfo(EntryInfo),
|
||||
}
|
||||
|
||||
impl Tpu {
|
||||
/// Create a new Tpu that wraps the given Accountant.
|
||||
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self {
|
||||
pub fn new(accounting_stage: AccountingStage) -> Self {
|
||||
let thin_client_service = ThinClientService::new(accounting_stage.accountant.clone());
|
||||
Tpu {
|
||||
acc: Mutex::new(acc),
|
||||
entry_info_subscribers: Mutex::new(vec![]),
|
||||
historian_input: Mutex::new(historian_input),
|
||||
historian,
|
||||
accounting_stage,
|
||||
thin_client_service,
|
||||
}
|
||||
}
|
||||
|
||||
fn notify_entry_info_subscribers(obj: &SharedTpu, entry: &Entry) {
|
||||
// TODO: No need to bind().
|
||||
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
|
||||
|
||||
// copy subscribers to avoid taking lock while doing io
|
||||
let addrs = obj.entry_info_subscribers.lock().unwrap().clone();
|
||||
trace!("Sending to {} addrs", addrs.len());
|
||||
for addr in addrs {
|
||||
let entry_info = EntryInfo {
|
||||
id: entry.id,
|
||||
num_hashes: entry.num_hashes,
|
||||
num_events: entry.events.len() as u64,
|
||||
};
|
||||
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
|
||||
trace!("sending {} to {}", data.len(), addr);
|
||||
//TODO dont do IO here, this needs to be on a separate channel
|
||||
let res = socket.send_to(&data, addr);
|
||||
if res.is_err() {
|
||||
eprintln!("couldn't send response: {:?}", res);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) {
|
||||
trace!("update_entry entry");
|
||||
obj.acc.lock().unwrap().register_entry_id(&entry.id);
|
||||
fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
|
||||
trace!("write_entry entry");
|
||||
self.accounting_stage
|
||||
.accountant
|
||||
.register_entry_id(&entry.id);
|
||||
writeln!(
|
||||
writer.lock().unwrap(),
|
||||
"{}",
|
||||
serde_json::to_string(&entry).unwrap()
|
||||
).unwrap();
|
||||
Self::notify_entry_info_subscribers(obj, &entry);
|
||||
self.thin_client_service
|
||||
.notify_entry_info_subscribers(&entry);
|
||||
}
|
||||
|
||||
fn receive_all<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> {
|
||||
fn write_entries<W: Write>(&self, writer: &Mutex<W>) -> Result<Vec<Entry>> {
|
||||
//TODO implement a serialize for channel that does this without allocations
|
||||
let mut l = vec![];
|
||||
let entry = obj.historian
|
||||
let entry = self.accounting_stage
|
||||
.output
|
||||
.lock()
|
||||
.unwrap()
|
||||
.recv_timeout(Duration::new(1, 0))?;
|
||||
Self::update_entry(obj, writer, &entry);
|
||||
self.write_entry(writer, &entry);
|
||||
l.push(entry);
|
||||
while let Ok(entry) = obj.historian.receive() {
|
||||
Self::update_entry(obj, writer, &entry);
|
||||
while let Ok(entry) = self.accounting_stage.output.lock().unwrap().try_recv() {
|
||||
self.write_entry(writer, &entry);
|
||||
l.push(entry);
|
||||
}
|
||||
Ok(l)
|
||||
}
|
||||
|
||||
fn process_entry_list_into_blobs(
|
||||
list: &Vec<Entry>,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
q: &mut VecDeque<SharedBlob>,
|
||||
) {
|
||||
let mut start = 0;
|
||||
let mut end = 0;
|
||||
while start < list.len() {
|
||||
let mut total = 0;
|
||||
for i in &list[start..] {
|
||||
total += size_of::<Event>() * i.events.len();
|
||||
total += size_of::<Entry>();
|
||||
if total >= BLOB_SIZE {
|
||||
break;
|
||||
}
|
||||
end += 1;
|
||||
}
|
||||
// See that we made progress and a single
|
||||
// vec of Events wasn't too big for a single packet
|
||||
if end <= start {
|
||||
// Trust the recorder to not package more than we can
|
||||
// serialize
|
||||
end += 1;
|
||||
}
|
||||
|
||||
let b = blob_recycler.allocate();
|
||||
let pos = {
|
||||
let mut bd = b.write().unwrap();
|
||||
let mut out = Cursor::new(bd.data_mut());
|
||||
serialize_into(&mut out, &list[start..end]).expect("failed to serialize output");
|
||||
out.position() as usize
|
||||
};
|
||||
assert!(pos < BLOB_SIZE);
|
||||
b.write().unwrap().set_size(pos);
|
||||
q.push_back(b);
|
||||
start = end;
|
||||
}
|
||||
}
|
||||
|
||||
/// Process any Entry items that have been published by the Historian.
|
||||
/// continuosly broadcast blobs of entries out
|
||||
fn run_sync<W: Write>(
|
||||
obj: SharedTpu,
|
||||
&self,
|
||||
broadcast: &streamer::BlobSender,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
writer: &Arc<Mutex<W>>,
|
||||
writer: &Mutex<W>,
|
||||
) -> Result<()> {
|
||||
let mut q = VecDeque::new();
|
||||
let list = Self::receive_all(&obj, writer)?;
|
||||
let list = self.write_entries(writer)?;
|
||||
trace!("New blobs? {}", list.len());
|
||||
Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
|
||||
ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
|
||||
if !q.is_empty() {
|
||||
broadcast.send(q)?;
|
||||
}
|
||||
|
@ -201,28 +96,10 @@ impl Tpu {
|
|||
exit: Arc<AtomicBool>,
|
||||
broadcast: streamer::BlobSender,
|
||||
blob_recycler: packet::BlobRecycler,
|
||||
writer: Arc<Mutex<W>>,
|
||||
writer: Mutex<W>,
|
||||
) -> JoinHandle<()> {
|
||||
spawn(move || loop {
|
||||
let _ = Self::run_sync(obj.clone(), &broadcast, &blob_recycler, &writer);
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
info!("sync_service exiting");
|
||||
break;
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn process_thin_client_requests(_obj: SharedTpu, _socket: &UdpSocket) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn thin_client_service(
|
||||
obj: SharedTpu,
|
||||
exit: Arc<AtomicBool>,
|
||||
socket: UdpSocket,
|
||||
) -> JoinHandle<()> {
|
||||
spawn(move || loop {
|
||||
let _ = Self::process_thin_client_requests(obj.clone(), &socket);
|
||||
let _ = obj.run_sync(&broadcast, &blob_recycler, &writer);
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
info!("sync_service exiting");
|
||||
break;
|
||||
|
@ -232,14 +109,14 @@ impl Tpu {
|
|||
|
||||
/// Process any Entry items that have been published by the Historian.
|
||||
/// continuosly broadcast blobs of entries out
|
||||
fn run_sync_no_broadcast(obj: SharedTpu) -> Result<()> {
|
||||
Self::receive_all(&obj, &Arc::new(Mutex::new(sink())))?;
|
||||
fn run_sync_no_broadcast(&self) -> Result<()> {
|
||||
self.write_entries(&Arc::new(Mutex::new(sink())))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn sync_no_broadcast_service(obj: SharedTpu, exit: Arc<AtomicBool>) -> JoinHandle<()> {
|
||||
spawn(move || loop {
|
||||
let _ = Self::run_sync_no_broadcast(obj.clone());
|
||||
let _ = obj.run_sync_no_broadcast();
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
info!("sync_no_broadcast_service exiting");
|
||||
break;
|
||||
|
@ -247,52 +124,6 @@ impl Tpu {
|
|||
})
|
||||
}
|
||||
|
||||
/// Process Request items sent by clients.
|
||||
pub fn process_request(
|
||||
&self,
|
||||
msg: Request,
|
||||
rsp_addr: SocketAddr,
|
||||
) -> Option<(Response, SocketAddr)> {
|
||||
match msg {
|
||||
Request::GetBalance { key } => {
|
||||
let val = self.acc.lock().unwrap().get_balance(&key);
|
||||
let rsp = (Response::Balance { key, val }, rsp_addr);
|
||||
info!("Response::Balance {:?}", rsp);
|
||||
Some(rsp)
|
||||
}
|
||||
Request::Transaction(_) => unreachable!(),
|
||||
Request::Subscribe { subscriptions } => {
|
||||
for subscription in subscriptions {
|
||||
match subscription {
|
||||
Subscription::EntryInfo => {
|
||||
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let msgs = recvr.recv_timeout(timer)?;
|
||||
debug!("got msgs");
|
||||
let mut len = msgs.read().unwrap().packets.len();
|
||||
let mut batch = vec![msgs];
|
||||
while let Ok(more) = recvr.try_recv() {
|
||||
trace!("got more msgs");
|
||||
len += more.read().unwrap().packets.len();
|
||||
batch.push(more);
|
||||
|
||||
if len > 100_000 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
debug!("batch len {}", batch.len());
|
||||
Ok((batch, len))
|
||||
}
|
||||
|
||||
fn verify_batch(
|
||||
batch: Vec<SharedPackets>,
|
||||
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
|
||||
|
@ -308,7 +139,7 @@ impl Tpu {
|
|||
recvr: &Arc<Mutex<streamer::PacketReceiver>>,
|
||||
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
|
||||
) -> Result<()> {
|
||||
let (batch, len) = Self::recv_batch(&recvr.lock().unwrap())?;
|
||||
let (batch, len) = streamer::recv_batch(&recvr.lock().unwrap())?;
|
||||
let now = Instant::now();
|
||||
let batch_len = batch.len();
|
||||
let rand_id = thread_rng().gen_range(0, 100);
|
||||
|
@ -335,174 +166,20 @@ impl Tpu {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn deserialize_packets(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> {
|
||||
p.packets
|
||||
.par_iter()
|
||||
.map(|x| {
|
||||
deserialize(&x.data[0..x.meta.size])
|
||||
.map(|req| (req, x.meta.addr()))
|
||||
.ok()
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Split Request list into verified transactions and the rest
|
||||
fn partition_requests(
|
||||
req_vers: Vec<(Request, SocketAddr, u8)>,
|
||||
) -> (Vec<Event>, Vec<(Request, SocketAddr)>) {
|
||||
let mut events = vec![];
|
||||
let mut reqs = vec![];
|
||||
for (msg, rsp_addr, verify) in req_vers {
|
||||
match msg {
|
||||
Request::Transaction(tr) => {
|
||||
if verify != 0 {
|
||||
events.push(Event::Transaction(tr));
|
||||
}
|
||||
}
|
||||
_ => reqs.push((msg, rsp_addr)),
|
||||
}
|
||||
}
|
||||
(events, reqs)
|
||||
}
|
||||
|
||||
/// Process the transactions in parallel and then log the successful ones.
|
||||
fn process_events(&self, events: Vec<Event>) -> Result<()> {
|
||||
for result in self.acc.lock().unwrap().process_verified_events(events) {
|
||||
if let Ok(event) = result {
|
||||
self.historian_input
|
||||
.lock()
|
||||
.unwrap()
|
||||
.send(Signal::Event(event))?;
|
||||
}
|
||||
}
|
||||
|
||||
// Let validators know they should not attempt to process additional
|
||||
// transactions in parallel.
|
||||
self.historian_input.lock().unwrap().send(Signal::Tick)?;
|
||||
debug!("after historian_input");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_requests(&self, reqs: Vec<(Request, SocketAddr)>) -> Vec<(Response, SocketAddr)> {
|
||||
reqs.into_iter()
|
||||
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn serialize_response(
|
||||
resp: Response,
|
||||
rsp_addr: SocketAddr,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
) -> Result<packet::SharedBlob> {
|
||||
let blob = blob_recycler.allocate();
|
||||
{
|
||||
let mut b = blob.write().unwrap();
|
||||
let v = serialize(&resp)?;
|
||||
let len = v.len();
|
||||
b.data[..len].copy_from_slice(&v);
|
||||
b.meta.size = len;
|
||||
b.meta.set_addr(&rsp_addr);
|
||||
}
|
||||
Ok(blob)
|
||||
}
|
||||
|
||||
fn serialize_responses(
|
||||
rsps: Vec<(Response, SocketAddr)>,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
) -> Result<VecDeque<packet::SharedBlob>> {
|
||||
let mut blobs = VecDeque::new();
|
||||
for (resp, rsp_addr) in rsps {
|
||||
blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?);
|
||||
}
|
||||
Ok(blobs)
|
||||
}
|
||||
|
||||
fn process(
|
||||
obj: &SharedTpu,
|
||||
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
|
||||
responder_sender: &streamer::BlobSender,
|
||||
packet_recycler: &packet::PacketRecycler,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let recv_start = Instant::now();
|
||||
let mms = verified_receiver.recv_timeout(timer)?;
|
||||
let mut reqs_len = 0;
|
||||
let mms_len = mms.len();
|
||||
info!(
|
||||
"@{:?} process start stalled for: {:?}ms batches: {}",
|
||||
timing::timestamp(),
|
||||
timing::duration_as_ms(&recv_start.elapsed()),
|
||||
mms.len(),
|
||||
);
|
||||
let proc_start = Instant::now();
|
||||
for (msgs, vers) in mms {
|
||||
let reqs = Self::deserialize_packets(&msgs.read().unwrap());
|
||||
reqs_len += reqs.len();
|
||||
let req_vers = reqs.into_iter()
|
||||
.zip(vers)
|
||||
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
|
||||
.filter(|x| {
|
||||
let v = x.0.verify();
|
||||
v
|
||||
})
|
||||
.collect();
|
||||
|
||||
debug!("partitioning");
|
||||
let (events, reqs) = Self::partition_requests(req_vers);
|
||||
debug!("events: {} reqs: {}", events.len(), reqs.len());
|
||||
|
||||
debug!("process_events");
|
||||
obj.process_events(events)?;
|
||||
debug!("done process_events");
|
||||
|
||||
debug!("process_requests");
|
||||
let rsps = obj.process_requests(reqs);
|
||||
debug!("done process_requests");
|
||||
|
||||
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
||||
if !blobs.is_empty() {
|
||||
info!("process: sending blobs: {}", blobs.len());
|
||||
//don't wake up the other side if there is nothing
|
||||
responder_sender.send(blobs)?;
|
||||
}
|
||||
packet_recycler.recycle(msgs);
|
||||
}
|
||||
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
|
||||
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
|
||||
info!(
|
||||
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
|
||||
timing::timestamp(),
|
||||
mms_len,
|
||||
total_time_ms,
|
||||
reqs_len,
|
||||
(reqs_len as f32) / (total_time_s)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
/// Process verified blobs, already in order
|
||||
/// Respond with a signed hash of the state
|
||||
fn replicate_state(
|
||||
obj: &SharedTpu,
|
||||
obj: &Tpu,
|
||||
verified_receiver: &streamer::BlobReceiver,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let blobs = verified_receiver.recv_timeout(timer)?;
|
||||
trace!("replicating blobs {}", blobs.len());
|
||||
for msgs in &blobs {
|
||||
let blob = msgs.read().unwrap();
|
||||
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
|
||||
let acc = obj.acc.lock().unwrap();
|
||||
for entry in entries {
|
||||
acc.register_entry_id(&entry.id);
|
||||
for result in acc.process_verified_events(entry.events) {
|
||||
result?;
|
||||
}
|
||||
}
|
||||
//TODO respond back to leader with hash of the state
|
||||
}
|
||||
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
|
||||
obj.accounting_stage
|
||||
.accountant
|
||||
.process_verified_entries(entries)?;
|
||||
for blob in blobs {
|
||||
blob_recycler.recycle(blob);
|
||||
}
|
||||
|
@ -516,7 +193,7 @@ impl Tpu {
|
|||
obj: &SharedTpu,
|
||||
me: ReplicatedData,
|
||||
serve: UdpSocket,
|
||||
skinny: UdpSocket,
|
||||
_events_socket: UdpSocket,
|
||||
gossip: UdpSocket,
|
||||
exit: Arc<AtomicBool>,
|
||||
writer: W,
|
||||
|
@ -576,15 +253,13 @@ impl Tpu {
|
|||
exit.clone(),
|
||||
broadcast_sender,
|
||||
blob_recycler.clone(),
|
||||
Arc::new(Mutex::new(writer)),
|
||||
Mutex::new(writer),
|
||||
);
|
||||
|
||||
let t_skinny = Self::thin_client_service(obj.clone(), exit.clone(), skinny);
|
||||
|
||||
let tpu = obj.clone();
|
||||
let t_server = spawn(move || loop {
|
||||
let e = Self::process(
|
||||
&mut tpu.clone(),
|
||||
let e = tpu.thin_client_service.process_request_packets(
|
||||
&tpu.accounting_stage,
|
||||
&verified_receiver,
|
||||
&responder_sender,
|
||||
&packet_recycler,
|
||||
|
@ -602,7 +277,6 @@ impl Tpu {
|
|||
t_responder,
|
||||
t_server,
|
||||
t_sync,
|
||||
t_skinny,
|
||||
t_gossip,
|
||||
t_listen,
|
||||
t_broadcast,
|
||||
|
@ -730,8 +404,8 @@ impl Tpu {
|
|||
let tpu = obj.clone();
|
||||
let s_exit = exit.clone();
|
||||
let t_server = spawn(move || loop {
|
||||
let e = Self::process(
|
||||
&mut tpu.clone(),
|
||||
let e = tpu.thin_client_service.process_request_packets(
|
||||
&tpu.accounting_stage,
|
||||
&verified_receiver,
|
||||
&responder_sender,
|
||||
&packet_recycler,
|
||||
|
@ -764,199 +438,49 @@ impl Tpu {
|
|||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn to_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedPackets> {
|
||||
let mut out = vec![];
|
||||
for rrs in reqs.chunks(packet::NUM_PACKETS) {
|
||||
let p = r.allocate();
|
||||
p.write()
|
||||
.unwrap()
|
||||
.packets
|
||||
.resize(rrs.len(), Default::default());
|
||||
for (i, o) in rrs.iter().zip(p.write().unwrap().packets.iter_mut()) {
|
||||
let v = serialize(&i).expect("serialize request");
|
||||
let len = v.len();
|
||||
o.data[..len].copy_from_slice(&v);
|
||||
o.meta.size = len;
|
||||
}
|
||||
out.push(p);
|
||||
}
|
||||
return out;
|
||||
pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
|
||||
let events_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
let d = ReplicatedData::new(
|
||||
pubkey,
|
||||
gossip.local_addr().unwrap(),
|
||||
replicate.local_addr().unwrap(),
|
||||
serve.local_addr().unwrap(),
|
||||
);
|
||||
(d, gossip, replicate, serve, events_socket)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use bincode::serialize;
|
||||
use ecdsa;
|
||||
use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS};
|
||||
use tpu::{to_packets, Request};
|
||||
use transaction::{memfind, test_tx};
|
||||
|
||||
use accountant::Accountant;
|
||||
use accounting_stage::AccountingStage;
|
||||
use bincode::serialize;
|
||||
use chrono::prelude::*;
|
||||
use crdt::Crdt;
|
||||
use crdt::ReplicatedData;
|
||||
use entry;
|
||||
use entry::Entry;
|
||||
use event::Event;
|
||||
use futures::Future;
|
||||
use hash::{hash, Hash};
|
||||
use historian::Historian;
|
||||
use logger;
|
||||
use mint::Mint;
|
||||
use plan::Plan;
|
||||
use recorder::Signal;
|
||||
use packet::BlobRecycler;
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use std::collections::VecDeque;
|
||||
use std::io::sink;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::mpsc::sync_channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use streamer;
|
||||
use thin_client::ThinClient;
|
||||
use tpu::Tpu;
|
||||
use tpu::{test_node, Tpu};
|
||||
use transaction::Transaction;
|
||||
|
||||
#[test]
|
||||
fn test_layout() {
|
||||
let tr = test_tx();
|
||||
let tx = serialize(&tr).unwrap();
|
||||
let packet = serialize(&Request::Transaction(tr)).unwrap();
|
||||
assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET));
|
||||
assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None);
|
||||
}
|
||||
#[test]
|
||||
fn test_to_packets() {
|
||||
let tr = Request::Transaction(test_tx());
|
||||
let re = PacketRecycler::default();
|
||||
let rv = to_packets(&re, vec![tr.clone(); 1]);
|
||||
assert_eq!(rv.len(), 1);
|
||||
assert_eq!(rv[0].read().unwrap().packets.len(), 1);
|
||||
|
||||
let rv = to_packets(&re, vec![tr.clone(); NUM_PACKETS]);
|
||||
assert_eq!(rv.len(), 1);
|
||||
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
|
||||
|
||||
let rv = to_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]);
|
||||
assert_eq!(rv.len(), 2);
|
||||
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
|
||||
assert_eq!(rv[1].read().unwrap().packets.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_accounting_sequential_consistency() {
|
||||
// In this attack we'll demonstrate that a verifier can interpret the ledger
|
||||
// differently if either the server doesn't signal the ledger to add an
|
||||
// Entry OR if the verifier tries to parallelize across multiple Entries.
|
||||
let mint = Mint::new(2);
|
||||
let acc = Accountant::new(&mint);
|
||||
let (input, event_receiver) = sync_channel(10);
|
||||
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
||||
let tpu = Tpu::new(acc, input, historian);
|
||||
|
||||
// Process a batch that includes a transaction that receives two tokens.
|
||||
let alice = KeyPair::new();
|
||||
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
||||
let events = vec![Event::Transaction(tr)];
|
||||
assert!(tpu.process_events(events).is_ok());
|
||||
|
||||
// Process a second batch that spends one of those tokens.
|
||||
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
|
||||
let events = vec![Event::Transaction(tr)];
|
||||
assert!(tpu.process_events(events).is_ok());
|
||||
|
||||
// Collect the ledger and feed it to a new accountant.
|
||||
tpu.historian_input
|
||||
.lock()
|
||||
.unwrap()
|
||||
.send(Signal::Tick)
|
||||
.unwrap();
|
||||
drop(tpu.historian_input);
|
||||
let entries: Vec<Entry> = tpu.historian.output.lock().unwrap().iter().collect();
|
||||
|
||||
// Assert the user holds one token, not two. If the server only output one
|
||||
// entry, then the second transaction will be rejected, because it drives
|
||||
// the account balance below zero before the credit is added.
|
||||
let acc = Accountant::new(&mint);
|
||||
for entry in entries {
|
||||
assert!(
|
||||
acc.process_verified_events(entry.events)
|
||||
.into_iter()
|
||||
.all(|x| x.is_ok())
|
||||
);
|
||||
}
|
||||
assert_eq!(acc.get_balance(&alice.pubkey()), Some(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_accountant_bad_sig() {
|
||||
let (leader_data, leader_gossip, _, leader_serve, leader_skinny) = test_node();
|
||||
let alice = Mint::new(10_000);
|
||||
let acc = Accountant::new(&alice);
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (input, event_receiver) = sync_channel(10);
|
||||
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||
let tpu = Arc::new(Tpu::new(acc, input, historian));
|
||||
let serve_addr = leader_serve.local_addr().unwrap();
|
||||
let threads = Tpu::serve(
|
||||
&tpu,
|
||||
leader_data,
|
||||
leader_serve,
|
||||
leader_skinny,
|
||||
leader_gossip,
|
||||
exit.clone(),
|
||||
sink(),
|
||||
).unwrap();
|
||||
sleep(Duration::from_millis(300));
|
||||
|
||||
let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
|
||||
let mut client = ThinClient::new(serve_addr, socket);
|
||||
let last_id = client.get_last_id().wait().unwrap();
|
||||
|
||||
trace!("doing stuff");
|
||||
|
||||
let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id);
|
||||
|
||||
let _sig = client.transfer_signed(tr).unwrap();
|
||||
|
||||
let last_id = client.get_last_id().wait().unwrap();
|
||||
|
||||
let mut tr2 = Transaction::new(&alice.keypair(), bob_pubkey, 501, last_id);
|
||||
tr2.data.tokens = 502;
|
||||
tr2.data.plan = Plan::new_payment(502, bob_pubkey);
|
||||
let _sig = client.transfer_signed(tr2).unwrap();
|
||||
|
||||
assert_eq!(client.get_balance(&bob_pubkey).unwrap(), 500);
|
||||
trace!("exiting");
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
trace!("joining threads");
|
||||
for t in threads {
|
||||
t.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
|
||||
let skinny = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let pubkey = KeyPair::new().pubkey();
|
||||
let d = ReplicatedData::new(
|
||||
pubkey,
|
||||
gossip.local_addr().unwrap(),
|
||||
replicate.local_addr().unwrap(),
|
||||
serve.local_addr().unwrap(),
|
||||
);
|
||||
(d, gossip, replicate, serve, skinny)
|
||||
}
|
||||
|
||||
/// Test that mesasge sent from leader to target1 and repliated to target2
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_replicate() {
|
||||
logger::setup();
|
||||
let (leader_data, leader_gossip, _, leader_serve, _) = test_node();
|
||||
|
@ -1005,13 +529,12 @@ mod tests {
|
|||
|
||||
let starting_balance = 10_000;
|
||||
let alice = Mint::new(starting_balance);
|
||||
let acc = Accountant::new(&alice);
|
||||
let (input, event_receiver) = sync_channel(10);
|
||||
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
|
||||
let acc = Arc::new(Tpu::new(acc, input, historian));
|
||||
let accountant = Accountant::new(&alice);
|
||||
let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
|
||||
let tpu = Arc::new(Tpu::new(accounting_stage));
|
||||
let replicate_addr = target1_data.replicate_addr;
|
||||
let threads = Tpu::replicate(
|
||||
&acc,
|
||||
&tpu,
|
||||
target1_data,
|
||||
target1_gossip,
|
||||
target1_serve,
|
||||
|
@ -1033,9 +556,11 @@ mod tests {
|
|||
w.set_index(i).unwrap();
|
||||
w.set_id(leader_id).unwrap();
|
||||
|
||||
let accountant = &tpu.accounting_stage.accountant;
|
||||
|
||||
let tr0 = Event::new_timestamp(&bob_keypair, Utc::now());
|
||||
let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]);
|
||||
acc.acc.lock().unwrap().register_entry_id(&cur_hash);
|
||||
accountant.register_entry_id(&cur_hash);
|
||||
cur_hash = hash(&cur_hash);
|
||||
|
||||
let tr1 = Transaction::new(
|
||||
|
@ -1044,11 +569,11 @@ mod tests {
|
|||
transfer_amount,
|
||||
cur_hash,
|
||||
);
|
||||
acc.acc.lock().unwrap().register_entry_id(&cur_hash);
|
||||
accountant.register_entry_id(&cur_hash);
|
||||
cur_hash = hash(&cur_hash);
|
||||
let entry1 =
|
||||
entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]);
|
||||
acc.acc.lock().unwrap().register_entry_id(&cur_hash);
|
||||
accountant.register_entry_id(&cur_hash);
|
||||
cur_hash = hash(&cur_hash);
|
||||
|
||||
alice_ref_balance -= transfer_amount;
|
||||
|
@ -1073,18 +598,11 @@ mod tests {
|
|||
msgs.push(msg);
|
||||
}
|
||||
|
||||
let alice_balance = acc.acc
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get_balance(&alice.keypair().pubkey())
|
||||
.unwrap();
|
||||
let accountant = &tpu.accounting_stage.accountant;
|
||||
let alice_balance = accountant.get_balance(&alice.keypair().pubkey()).unwrap();
|
||||
assert_eq!(alice_balance, alice_ref_balance);
|
||||
|
||||
let bob_balance = acc.acc
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get_balance(&bob_keypair.pubkey())
|
||||
.unwrap();
|
||||
let bob_balance = accountant.get_balance(&bob_keypair.pubkey()).unwrap();
|
||||
assert_eq!(bob_balance, starting_balance - alice_ref_balance);
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
|
@ -1099,100 +617,4 @@ mod tests {
|
|||
t_l_listen.join().expect("join");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_entry_to_blobs() {
|
||||
let zero = Hash::default();
|
||||
let keypair = KeyPair::new();
|
||||
let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 0, zero));
|
||||
let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero));
|
||||
let e0 = entry::create_entry(&zero, 0, vec![tr0.clone(), tr1.clone()]);
|
||||
|
||||
let entry_list = vec![e0; 1000];
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
let mut blob_q = VecDeque::new();
|
||||
Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
|
||||
let serialized_entry_list = serialize(&entry_list).unwrap();
|
||||
let mut num_blobs_ref = serialized_entry_list.len() / BLOB_SIZE;
|
||||
if serialized_entry_list.len() % BLOB_SIZE != 0 {
|
||||
num_blobs_ref += 1
|
||||
}
|
||||
trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref);
|
||||
assert!(blob_q.len() > num_blobs_ref);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "unstable", test))]
|
||||
mod bench {
|
||||
extern crate test;
|
||||
use self::test::Bencher;
|
||||
use accountant::{Accountant, MAX_ENTRY_IDS};
|
||||
use bincode::serialize;
|
||||
use hash::hash;
|
||||
use mint::Mint;
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use std::collections::HashSet;
|
||||
use std::sync::mpsc::sync_channel;
|
||||
use std::time::Instant;
|
||||
use tpu::*;
|
||||
use transaction::Transaction;
|
||||
|
||||
#[bench]
|
||||
fn process_packets_bench(_bencher: &mut Bencher) {
|
||||
let mint = Mint::new(100_000_000);
|
||||
let acc = Accountant::new(&mint);
|
||||
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
|
||||
// Create transactions between unrelated parties.
|
||||
let txs = 100_000;
|
||||
let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
|
||||
let transactions: Vec<_> = (0..txs)
|
||||
.into_par_iter()
|
||||
.map(|i| {
|
||||
// Seed the 'to' account and a cell for its signature.
|
||||
let dummy_id = i % (MAX_ENTRY_IDS as i32);
|
||||
let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
|
||||
{
|
||||
let mut last_ids = last_ids.lock().unwrap();
|
||||
if !last_ids.contains(&last_id) {
|
||||
last_ids.insert(last_id);
|
||||
acc.register_entry_id(&last_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Seed the 'from' account.
|
||||
let rando0 = KeyPair::new();
|
||||
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
|
||||
acc.process_verified_transaction(&tr).unwrap();
|
||||
|
||||
let rando1 = KeyPair::new();
|
||||
let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
|
||||
acc.process_verified_transaction(&tr).unwrap();
|
||||
|
||||
// Finally, return a transaction that's unique
|
||||
Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let req_vers = transactions
|
||||
.into_iter()
|
||||
.map(|tr| (Request::Transaction(tr), rsp_addr, 1_u8))
|
||||
.collect();
|
||||
|
||||
let (input, event_receiver) = sync_channel(10);
|
||||
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
||||
let tpu = Tpu::new(acc, input, historian);
|
||||
|
||||
let now = Instant::now();
|
||||
assert!(tpu.process_events(req_vers).is_ok());
|
||||
let duration = now.elapsed();
|
||||
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
|
||||
let tps = txs as f64 / sec;
|
||||
|
||||
// Ensure that all transactions were successfully logged.
|
||||
drop(tpu.historian_input);
|
||||
let entries: Vec<Entry> = tpu.historian.output.lock().unwrap().iter().collect();
|
||||
assert_eq!(entries.len(), 1);
|
||||
assert_eq!(entries[0].events.len(), txs as usize);
|
||||
|
||||
println!("{} tps", tps);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue