Merge pull request #117 from garious/parallelize-accountant
Enable parallelized accountant
This commit is contained in:
commit
ab74e7f24f
|
@ -18,7 +18,7 @@ use std::result;
|
|||
use std::sync::RwLock;
|
||||
use transaction::Transaction;
|
||||
|
||||
const MAX_ENTRY_IDS: usize = 1024 * 4;
|
||||
pub const MAX_ENTRY_IDS: usize = 1024 * 4;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum AccountingError {
|
||||
|
@ -152,12 +152,17 @@ impl Accountant {
|
|||
}
|
||||
|
||||
/// Process a batch of verified transactions.
|
||||
pub fn process_verified_transactions(&self, trs: &[Transaction]) -> Vec<Result<()>> {
|
||||
pub fn process_verified_transactions(&self, trs: Vec<Transaction>) -> Vec<Result<Transaction>> {
|
||||
// Run all debits first to filter out any transactions that can't be processed
|
||||
// in parallel deterministically.
|
||||
trs.par_iter()
|
||||
.map(|tr| self.process_verified_transaction_debits(tr).map(|_| tr))
|
||||
.map(|result| result.map(|tr| self.process_verified_transaction_credits(tr)))
|
||||
trs.into_par_iter()
|
||||
.map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr))
|
||||
.map(|result| {
|
||||
result.map(|tr| {
|
||||
self.process_verified_transaction_credits(&tr);
|
||||
tr
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
|
@ -448,7 +453,7 @@ mod bench {
|
|||
}
|
||||
|
||||
assert!(
|
||||
acc.process_verified_transactions(&transactions)
|
||||
acc.process_verified_transactions(transactions.clone())
|
||||
.iter()
|
||||
.all(|x| x.is_ok())
|
||||
);
|
||||
|
|
|
@ -21,7 +21,7 @@ use std::collections::VecDeque;
|
|||
use std::io::Write;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{channel, Receiver, SendError, Sender};
|
||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread::{spawn, JoinHandle};
|
||||
use std::time::Duration;
|
||||
|
@ -82,28 +82,18 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
|||
}
|
||||
|
||||
/// Process Request items sent by clients.
|
||||
pub fn log_verified_request(&mut self, msg: Request, verify: u8) -> Option<Response> {
|
||||
pub fn process_request(
|
||||
&mut self,
|
||||
msg: Request,
|
||||
rsp_addr: SocketAddr,
|
||||
) -> Option<(Response, SocketAddr)> {
|
||||
match msg {
|
||||
Request::Transaction(_) if verify == 0 => {
|
||||
trace!("Transaction failed sigverify");
|
||||
None
|
||||
}
|
||||
Request::Transaction(tr) => {
|
||||
if let Err(err) = self.acc.process_verified_transaction(&tr) {
|
||||
trace!("Transaction error: {:?}", err);
|
||||
} else if let Err(SendError(_)) = self.historian
|
||||
.sender
|
||||
.send(Signal::Event(Event::Transaction(tr.clone())))
|
||||
{
|
||||
error!("Channel send error");
|
||||
}
|
||||
None
|
||||
}
|
||||
Request::GetBalance { key } => {
|
||||
let val = self.acc.get_balance(&key);
|
||||
Some(Response::Balance { key, val })
|
||||
Some((Response::Balance { key, val }, rsp_addr))
|
||||
}
|
||||
Request::GetLastId => Some(Response::LastId { id: self.sync() }),
|
||||
Request::GetLastId => Some((Response::LastId { id: self.sync() }, rsp_addr)),
|
||||
Request::Transaction(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -155,18 +145,46 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
|||
.collect()
|
||||
}
|
||||
|
||||
fn process_packets(
|
||||
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
||||
/// Split Request list into verified transactions and the rest
|
||||
fn partition_requests(
|
||||
req_vers: Vec<(Request, SocketAddr, u8)>,
|
||||
) -> Vec<(Response, SocketAddr)> {
|
||||
req_vers
|
||||
.into_iter()
|
||||
.filter_map(|(req, rsp_addr, v)| {
|
||||
let mut skel = obj.lock().unwrap();
|
||||
skel.log_verified_request(req, v)
|
||||
.map(|resp| (resp, rsp_addr))
|
||||
})
|
||||
.collect()
|
||||
) -> (Vec<Transaction>, Vec<(Request, SocketAddr)>) {
|
||||
let mut trs = vec![];
|
||||
let mut reqs = vec![];
|
||||
for (msg, rsp_addr, verify) in req_vers {
|
||||
match msg {
|
||||
Request::Transaction(tr) => {
|
||||
if verify != 0 {
|
||||
trs.push(tr);
|
||||
}
|
||||
}
|
||||
_ => reqs.push((msg, rsp_addr)),
|
||||
}
|
||||
}
|
||||
(trs, reqs)
|
||||
}
|
||||
|
||||
fn process_packets(
|
||||
&mut self,
|
||||
req_vers: Vec<(Request, SocketAddr, u8)>,
|
||||
) -> Result<Vec<(Response, SocketAddr)>> {
|
||||
let (trs, reqs) = Self::partition_requests(req_vers);
|
||||
|
||||
// Process the transactions in parallel and then log the successful ones.
|
||||
for result in self.acc.process_verified_transactions(trs) {
|
||||
if let Ok(tr) = result {
|
||||
self.historian
|
||||
.sender
|
||||
.send(Signal::Event(Event::Transaction(tr)))?;
|
||||
}
|
||||
}
|
||||
|
||||
// Process the remaining requests serially.
|
||||
let rsps = reqs.into_iter()
|
||||
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
|
||||
.collect();
|
||||
|
||||
Ok(rsps)
|
||||
}
|
||||
|
||||
fn serialize_response(
|
||||
|
@ -213,7 +231,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
|||
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
|
||||
.filter(|x| x.0.verify())
|
||||
.collect();
|
||||
let rsps = Self::process_packets(obj, req_vers);
|
||||
let rsps = obj.lock().unwrap().process_packets(req_vers)?;
|
||||
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
||||
if !blobs.is_empty() {
|
||||
//don't wake up the other side if there is nothing
|
||||
|
@ -286,3 +304,79 @@ mod tests {
|
|||
assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET));
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "unstable", test))]
|
||||
mod bench {
|
||||
extern crate test;
|
||||
use self::test::Bencher;
|
||||
use accountant_skel::*;
|
||||
use accountant::{Accountant, MAX_ENTRY_IDS};
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use mint::Mint;
|
||||
use transaction::Transaction;
|
||||
use std::collections::HashSet;
|
||||
use std::io::sink;
|
||||
use std::time::Instant;
|
||||
use bincode::serialize;
|
||||
use hash::hash;
|
||||
|
||||
#[bench]
|
||||
fn process_packets_bench(_bencher: &mut Bencher) {
|
||||
let mint = Mint::new(100_000_000);
|
||||
let acc = Accountant::new(&mint);
|
||||
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
|
||||
// Create transactions between unrelated parties.
|
||||
let txs = 100_000;
|
||||
let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
|
||||
let transactions: Vec<_> = (0..txs)
|
||||
.into_par_iter()
|
||||
.map(|i| {
|
||||
// Seed the 'to' account and a cell for its signature.
|
||||
let dummy_id = i % (MAX_ENTRY_IDS as i32);
|
||||
let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
|
||||
{
|
||||
let mut last_ids = last_ids.lock().unwrap();
|
||||
if !last_ids.contains(&last_id) {
|
||||
last_ids.insert(last_id);
|
||||
acc.register_entry_id(&last_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Seed the 'from' account.
|
||||
let rando0 = KeyPair::new();
|
||||
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
|
||||
acc.process_verified_transaction(&tr).unwrap();
|
||||
|
||||
let rando1 = KeyPair::new();
|
||||
let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
|
||||
acc.process_verified_transaction(&tr).unwrap();
|
||||
|
||||
// Finally, return a transaction that's unique
|
||||
Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let req_vers = transactions
|
||||
.into_iter()
|
||||
.map(|tr| (Request::Transaction(tr), rsp_addr, 1_u8))
|
||||
.collect();
|
||||
|
||||
let historian = Historian::new(&mint.last_id(), None);
|
||||
let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian);
|
||||
|
||||
let now = Instant::now();
|
||||
assert!(skel.process_packets(req_vers).is_ok());
|
||||
let duration = now.elapsed();
|
||||
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
|
||||
let tps = txs as f64 / sec;
|
||||
|
||||
// Ensure that all transactions were successfully logged.
|
||||
skel.historian.sender.send(Signal::Tick).unwrap();
|
||||
drop(skel.historian.sender);
|
||||
let entries: Vec<Entry> = skel.historian.receiver.iter().collect();
|
||||
assert_eq!(entries.len(), 1);
|
||||
assert_eq!(entries[0].events.len(), txs as usize);
|
||||
|
||||
println!("{} tps", tps);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue