Merge pull request #178 from garious/split-request
Refactoring for upcoming thin client port
This commit is contained in:
commit
9e8ec86fa3
|
@ -218,13 +218,18 @@ impl Accountant {
|
||||||
(trs, rest)
|
(trs, rest)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn process_verified_events(&self, events: Vec<Event>) -> Result<()> {
|
pub fn process_verified_events(&self, events: Vec<Event>) -> Vec<Result<Event>> {
|
||||||
let (trs, rest) = Self::partition_events(events);
|
let (trs, rest) = Self::partition_events(events);
|
||||||
self.process_verified_transactions(trs);
|
let mut results: Vec<_> = self.process_verified_transactions(trs)
|
||||||
|
.into_iter()
|
||||||
|
.map(|x| x.map(Event::Transaction))
|
||||||
|
.collect();
|
||||||
|
|
||||||
for event in rest {
|
for event in rest {
|
||||||
self.process_verified_event(&event)?;
|
results.push(self.process_verified_event(event));
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
|
results
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process a Witness Signature that has already been verified.
|
/// Process a Witness Signature that has already been verified.
|
||||||
|
@ -278,12 +283,13 @@ impl Accountant {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process an Transaction or Witness that has already been verified.
|
/// Process an Transaction or Witness that has already been verified.
|
||||||
pub fn process_verified_event(&self, event: &Event) -> Result<()> {
|
pub fn process_verified_event(&self, event: Event) -> Result<Event> {
|
||||||
match *event {
|
match event {
|
||||||
Event::Transaction(ref tr) => self.process_verified_transaction(tr),
|
Event::Transaction(ref tr) => self.process_verified_transaction(tr),
|
||||||
Event::Signature { from, tx_sig, .. } => self.process_verified_sig(from, tx_sig),
|
Event::Signature { from, tx_sig, .. } => self.process_verified_sig(from, tx_sig),
|
||||||
Event::Timestamp { from, dt, .. } => self.process_verified_timestamp(from, dt),
|
Event::Timestamp { from, dt, .. } => self.process_verified_timestamp(from, dt),
|
||||||
}
|
}?;
|
||||||
|
Ok(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create, sign, and process a Transaction from `keypair` to `to` of
|
/// Create, sign, and process a Transaction from `keypair` to `to` of
|
||||||
|
|
|
@ -308,37 +308,30 @@ impl AccountantSkel {
|
||||||
/// Split Request list into verified transactions and the rest
|
/// Split Request list into verified transactions and the rest
|
||||||
fn partition_requests(
|
fn partition_requests(
|
||||||
req_vers: Vec<(Request, SocketAddr, u8)>,
|
req_vers: Vec<(Request, SocketAddr, u8)>,
|
||||||
) -> (Vec<Transaction>, Vec<(Request, SocketAddr)>) {
|
) -> (Vec<Event>, Vec<(Request, SocketAddr)>) {
|
||||||
let mut trs = vec![];
|
let mut events = vec![];
|
||||||
let mut reqs = vec![];
|
let mut reqs = vec![];
|
||||||
for (msg, rsp_addr, verify) in req_vers {
|
for (msg, rsp_addr, verify) in req_vers {
|
||||||
match msg {
|
match msg {
|
||||||
Request::Transaction(tr) => {
|
Request::Transaction(tr) => {
|
||||||
if verify != 0 {
|
if verify != 0 {
|
||||||
trs.push(tr);
|
events.push(Event::Transaction(tr));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => reqs.push((msg, rsp_addr)),
|
_ => reqs.push((msg, rsp_addr)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(trs, reqs)
|
(events, reqs)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_packets(
|
/// Process the transactions in parallel and then log the successful ones.
|
||||||
&self,
|
fn process_events(&self, events: Vec<Event>) -> Result<()> {
|
||||||
req_vers: Vec<(Request, SocketAddr, u8)>,
|
for result in self.acc.lock().unwrap().process_verified_events(events) {
|
||||||
) -> Result<Vec<(Response, SocketAddr)>> {
|
if let Ok(event) = result {
|
||||||
debug!("partitioning");
|
|
||||||
let (trs, reqs) = Self::partition_requests(req_vers);
|
|
||||||
debug!("trs: {} reqs: {}", trs.len(), reqs.len());
|
|
||||||
|
|
||||||
// Process the transactions in parallel and then log the successful ones.
|
|
||||||
for result in self.acc.lock().unwrap().process_verified_transactions(trs) {
|
|
||||||
if let Ok(tr) = result {
|
|
||||||
self.historian_input
|
self.historian_input
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.send(Signal::Event(Event::Transaction(tr)))?;
|
.send(Signal::Event(event))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -347,17 +340,15 @@ impl AccountantSkel {
|
||||||
// Let validators know they should not attempt to process additional
|
// Let validators know they should not attempt to process additional
|
||||||
// transactions in parallel.
|
// transactions in parallel.
|
||||||
self.historian_input.lock().unwrap().send(Signal::Tick)?;
|
self.historian_input.lock().unwrap().send(Signal::Tick)?;
|
||||||
|
|
||||||
debug!("after historian_input");
|
debug!("after historian_input");
|
||||||
|
|
||||||
// Process the remaining requests serially.
|
Ok(())
|
||||||
let rsps = reqs.into_iter()
|
}
|
||||||
|
|
||||||
|
fn process_requests(&self, reqs: Vec<(Request, SocketAddr)>) -> Vec<(Response, SocketAddr)> {
|
||||||
|
reqs.into_iter()
|
||||||
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
|
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
|
||||||
.collect();
|
.collect()
|
||||||
|
|
||||||
debug!("returning rsps");
|
|
||||||
|
|
||||||
Ok(rsps)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn serialize_response(
|
fn serialize_response(
|
||||||
|
@ -409,9 +400,19 @@ impl AccountantSkel {
|
||||||
v
|
v
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
debug!("process_packets");
|
|
||||||
let rsps = obj.process_packets(req_vers)?;
|
debug!("partitioning");
|
||||||
debug!("done process_packets");
|
let (events, reqs) = Self::partition_requests(req_vers);
|
||||||
|
debug!("events: {} reqs: {}", events.len(), reqs.len());
|
||||||
|
|
||||||
|
debug!("process_events");
|
||||||
|
obj.process_events(events)?;
|
||||||
|
debug!("done process_events");
|
||||||
|
|
||||||
|
debug!("process_requests");
|
||||||
|
let rsps = obj.process_requests(reqs);
|
||||||
|
debug!("done process_requests");
|
||||||
|
|
||||||
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
||||||
if !blobs.is_empty() {
|
if !blobs.is_empty() {
|
||||||
info!("process: sending blobs: {}", blobs.len());
|
info!("process: sending blobs: {}", blobs.len());
|
||||||
|
@ -436,13 +437,12 @@ impl AccountantSkel {
|
||||||
for msgs in &blobs {
|
for msgs in &blobs {
|
||||||
let blob = msgs.read().unwrap();
|
let blob = msgs.read().unwrap();
|
||||||
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
|
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
|
||||||
|
let acc = obj.acc.lock().unwrap();
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
obj.acc.lock().unwrap().register_entry_id(&entry.id);
|
acc.register_entry_id(&entry.id);
|
||||||
|
for result in acc.process_verified_events(entry.events) {
|
||||||
obj.acc
|
result?;
|
||||||
.lock()
|
}
|
||||||
.unwrap()
|
|
||||||
.process_verified_events(entry.events)?;
|
|
||||||
}
|
}
|
||||||
//TODO respond back to leader with hash of the state
|
//TODO respond back to leader with hash of the state
|
||||||
}
|
}
|
||||||
|
@ -732,7 +732,7 @@ mod tests {
|
||||||
use signature::{KeyPair, KeyPairUtil};
|
use signature::{KeyPair, KeyPairUtil};
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::io::sink;
|
use std::io::sink;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
use std::sync::mpsc::sync_channel;
|
use std::sync::mpsc::sync_channel;
|
||||||
|
@ -775,7 +775,6 @@ mod tests {
|
||||||
// Entry OR if the verifier tries to parallelize across multiple Entries.
|
// Entry OR if the verifier tries to parallelize across multiple Entries.
|
||||||
let mint = Mint::new(2);
|
let mint = Mint::new(2);
|
||||||
let acc = Accountant::new(&mint);
|
let acc = Accountant::new(&mint);
|
||||||
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
|
|
||||||
let (input, event_receiver) = sync_channel(10);
|
let (input, event_receiver) = sync_channel(10);
|
||||||
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
||||||
let skel = AccountantSkel::new(acc, input, historian);
|
let skel = AccountantSkel::new(acc, input, historian);
|
||||||
|
@ -783,13 +782,13 @@ mod tests {
|
||||||
// Process a batch that includes a transaction that receives two tokens.
|
// Process a batch that includes a transaction that receives two tokens.
|
||||||
let alice = KeyPair::new();
|
let alice = KeyPair::new();
|
||||||
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
||||||
let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)];
|
let events = vec![Event::Transaction(tr)];
|
||||||
assert!(skel.process_packets(req_vers).is_ok());
|
assert!(skel.process_events(events).is_ok());
|
||||||
|
|
||||||
// Process a second batch that spends one of those tokens.
|
// Process a second batch that spends one of those tokens.
|
||||||
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
|
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
|
||||||
let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)];
|
let events = vec![Event::Transaction(tr)];
|
||||||
assert!(skel.process_packets(req_vers).is_ok());
|
assert!(skel.process_events(events).is_ok());
|
||||||
|
|
||||||
// Collect the ledger and feed it to a new accountant.
|
// Collect the ledger and feed it to a new accountant.
|
||||||
skel.historian_input
|
skel.historian_input
|
||||||
|
@ -805,7 +804,11 @@ mod tests {
|
||||||
// the account balance below zero before the credit is added.
|
// the account balance below zero before the credit is added.
|
||||||
let acc = Accountant::new(&mint);
|
let acc = Accountant::new(&mint);
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
acc.process_verified_events(entry.events).unwrap();
|
assert!(
|
||||||
|
acc.process_verified_events(entry.events)
|
||||||
|
.into_iter()
|
||||||
|
.all(|x| x.is_ok())
|
||||||
|
);
|
||||||
}
|
}
|
||||||
assert_eq!(acc.get_balance(&alice.pubkey()), Some(1));
|
assert_eq!(acc.get_balance(&alice.pubkey()), Some(1));
|
||||||
}
|
}
|
||||||
|
@ -1099,7 +1102,7 @@ mod bench {
|
||||||
let skel = AccountantSkel::new(acc, input, historian);
|
let skel = AccountantSkel::new(acc, input, historian);
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
assert!(skel.process_packets(req_vers).is_ok());
|
assert!(skel.process_events(req_vers).is_ok());
|
||||||
let duration = now.elapsed();
|
let duration = now.elapsed();
|
||||||
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
|
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
|
||||||
let tps = txs as f64 / sec;
|
let tps = txs as f64 / sec;
|
||||||
|
|
|
@ -103,7 +103,13 @@ fn main() {
|
||||||
let mut last_id = entry1.id;
|
let mut last_id = entry1.id;
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
last_id = entry.id;
|
last_id = entry.id;
|
||||||
acc.process_verified_events(entry.events).unwrap();
|
let results = acc.process_verified_events(entry.events);
|
||||||
|
for result in results {
|
||||||
|
if let Err(e) = result {
|
||||||
|
eprintln!("failed to process event {:?}", e);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
acc.register_entry_id(&last_id);
|
acc.register_entry_id(&last_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue