Use AccountingStage in Tpu
This commit is contained in:
parent
98ae80f4ed
commit
e4c47e8417
|
@ -14,7 +14,7 @@ use std::sync::Mutex;
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
pub struct AccountingStage {
|
pub struct AccountingStage {
|
||||||
acc: Mutex<Accountant>,
|
pub acc: Mutex<Accountant>,
|
||||||
historian_input: Mutex<SyncSender<Signal>>,
|
historian_input: Mutex<SyncSender<Signal>>,
|
||||||
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
|
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,7 +134,7 @@ mod tests {
|
||||||
use ecdsa;
|
use ecdsa;
|
||||||
use packet::{Packet, Packets, SharedPackets};
|
use packet::{Packet, Packets, SharedPackets};
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
use tpu::Request;
|
use accounting_stage::Request;
|
||||||
use transaction::test_tx;
|
use transaction::test_tx;
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
|
|
|
@ -594,6 +594,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore]
|
||||||
//retransmit from leader to replicate target
|
//retransmit from leader to replicate target
|
||||||
pub fn retransmit() {
|
pub fn retransmit() {
|
||||||
logger::setup();
|
logger::setup();
|
||||||
|
|
|
@ -10,7 +10,7 @@ use signature::{KeyPair, PublicKey, Signature};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
use tpu::{Request, Response, Subscription};
|
use accounting_stage::{Request, Response, Subscription};
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
pub struct ThinClient {
|
pub struct ThinClient {
|
||||||
|
|
246
src/tpu.rs
246
src/tpu.rs
|
@ -2,12 +2,12 @@
|
||||||
//! 5-stage transaction processing pipeline in software.
|
//! 5-stage transaction processing pipeline in software.
|
||||||
|
|
||||||
use accountant::Accountant;
|
use accountant::Accountant;
|
||||||
|
use accounting_stage::{AccountingStage, Request, Response};
|
||||||
use bincode::{deserialize, serialize, serialize_into};
|
use bincode::{deserialize, serialize, serialize_into};
|
||||||
use crdt::{Crdt, ReplicatedData};
|
use crdt::{Crdt, ReplicatedData};
|
||||||
use ecdsa;
|
use ecdsa;
|
||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
use event::Event;
|
use event::Event;
|
||||||
use hash::Hash;
|
|
||||||
use historian::Historian;
|
use historian::Historian;
|
||||||
use packet;
|
use packet;
|
||||||
use packet::{SharedBlob, SharedPackets, BLOB_SIZE};
|
use packet::{SharedBlob, SharedPackets, BLOB_SIZE};
|
||||||
|
@ -16,7 +16,6 @@ use rayon::prelude::*;
|
||||||
use recorder::Signal;
|
use recorder::Signal;
|
||||||
use result::Result;
|
use result::Result;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
use signature::PublicKey;
|
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::io::sink;
|
use std::io::sink;
|
||||||
use std::io::{Cursor, Write};
|
use std::io::{Cursor, Write};
|
||||||
|
@ -30,137 +29,14 @@ use std::time::Duration;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use streamer;
|
use streamer;
|
||||||
use timing;
|
use timing;
|
||||||
use transaction::Transaction;
|
|
||||||
|
|
||||||
struct AccountingStage {
|
|
||||||
acc: Mutex<Accountant>,
|
|
||||||
historian_input: Mutex<SyncSender<Signal>>,
|
|
||||||
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AccountingStage {
|
|
||||||
/// Create a new Tpu that wraps the given Accountant.
|
|
||||||
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>) -> Self {
|
|
||||||
AccountingStage {
|
|
||||||
acc: Mutex::new(acc),
|
|
||||||
entry_info_subscribers: Mutex::new(vec![]),
|
|
||||||
historian_input: Mutex::new(historian_input),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Process the transactions in parallel and then log the successful ones.
|
|
||||||
pub fn process_events(&self, events: Vec<Event>) -> Result<()> {
|
|
||||||
let results = self.acc.lock().unwrap().process_verified_events(events);
|
|
||||||
let events = results.into_iter().filter_map(|x| x.ok()).collect();
|
|
||||||
let sender = self.historian_input.lock().unwrap();
|
|
||||||
sender.send(Signal::Events(events))?;
|
|
||||||
debug!("after historian_input");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Process Request items sent by clients.
|
|
||||||
fn process_request(
|
|
||||||
&self,
|
|
||||||
msg: Request,
|
|
||||||
rsp_addr: SocketAddr,
|
|
||||||
) -> Option<(Response, SocketAddr)> {
|
|
||||||
match msg {
|
|
||||||
Request::GetBalance { key } => {
|
|
||||||
let val = self.acc.lock().unwrap().get_balance(&key);
|
|
||||||
let rsp = (Response::Balance { key, val }, rsp_addr);
|
|
||||||
info!("Response::Balance {:?}", rsp);
|
|
||||||
Some(rsp)
|
|
||||||
}
|
|
||||||
Request::Transaction(_) => unreachable!(),
|
|
||||||
Request::Subscribe { subscriptions } => {
|
|
||||||
for subscription in subscriptions {
|
|
||||||
match subscription {
|
|
||||||
Subscription::EntryInfo => {
|
|
||||||
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn process_requests(
|
|
||||||
&self,
|
|
||||||
reqs: Vec<(Request, SocketAddr)>,
|
|
||||||
) -> Vec<(Response, SocketAddr)> {
|
|
||||||
reqs.into_iter()
|
|
||||||
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn notify_entry_info_subscribers(&self, entry: &Entry) {
|
|
||||||
// TODO: No need to bind().
|
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
|
|
||||||
|
|
||||||
// copy subscribers to avoid taking lock while doing io
|
|
||||||
let addrs = self.entry_info_subscribers.lock().unwrap().clone();
|
|
||||||
trace!("Sending to {} addrs", addrs.len());
|
|
||||||
for addr in addrs {
|
|
||||||
let entry_info = EntryInfo {
|
|
||||||
id: entry.id,
|
|
||||||
num_hashes: entry.num_hashes,
|
|
||||||
num_events: entry.events.len() as u64,
|
|
||||||
};
|
|
||||||
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
|
|
||||||
trace!("sending {} to {}", data.len(), addr);
|
|
||||||
//TODO dont do IO here, this needs to be on a separate channel
|
|
||||||
let res = socket.send_to(&data, addr);
|
|
||||||
if res.is_err() {
|
|
||||||
eprintln!("couldn't send response: {:?}", res);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Tpu {
|
pub struct Tpu {
|
||||||
accounting: AccountingStage,
|
accounting: AccountingStage,
|
||||||
historian: Historian,
|
historian: Historian,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
|
||||||
pub enum Request {
|
|
||||||
Transaction(Transaction),
|
|
||||||
GetBalance { key: PublicKey },
|
|
||||||
Subscribe { subscriptions: Vec<Subscription> },
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
|
||||||
pub enum Subscription {
|
|
||||||
EntryInfo,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
|
||||||
pub struct EntryInfo {
|
|
||||||
pub id: Hash,
|
|
||||||
pub num_hashes: u64,
|
|
||||||
pub num_events: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Request {
|
|
||||||
/// Verify the request is valid.
|
|
||||||
pub fn verify(&self) -> bool {
|
|
||||||
match *self {
|
|
||||||
Request::Transaction(ref tr) => tr.verify_plan(),
|
|
||||||
_ => true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type SharedTpu = Arc<Tpu>;
|
type SharedTpu = Arc<Tpu>;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
|
||||||
pub enum Response {
|
|
||||||
Balance { key: PublicKey, val: Option<i64> },
|
|
||||||
EntryInfo(EntryInfo),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Tpu {
|
impl Tpu {
|
||||||
/// Create a new Tpu that wraps the given Accountant.
|
/// Create a new Tpu that wraps the given Accountant.
|
||||||
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self {
|
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self {
|
||||||
|
@ -808,7 +684,6 @@ mod tests {
|
||||||
use crdt::Crdt;
|
use crdt::Crdt;
|
||||||
use crdt::ReplicatedData;
|
use crdt::ReplicatedData;
|
||||||
use entry;
|
use entry;
|
||||||
use entry::Entry;
|
|
||||||
use event::Event;
|
use event::Event;
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use hash::{hash, Hash};
|
use hash::{hash, Hash};
|
||||||
|
@ -828,7 +703,7 @@ mod tests {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use streamer;
|
use streamer;
|
||||||
use thin_client::ThinClient;
|
use thin_client::ThinClient;
|
||||||
use tpu::{AccountingStage, Tpu};
|
use tpu::Tpu;
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -857,46 +732,6 @@ mod tests {
|
||||||
assert_eq!(rv[1].read().unwrap().packets.len(), 1);
|
assert_eq!(rv[1].read().unwrap().packets.len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_accounting_sequential_consistency() {
|
|
||||||
// In this attack we'll demonstrate that a verifier can interpret the ledger
|
|
||||||
// differently if either the server doesn't signal the ledger to add an
|
|
||||||
// Entry OR if the verifier tries to parallelize across multiple Entries.
|
|
||||||
let mint = Mint::new(2);
|
|
||||||
let acc = Accountant::new(&mint);
|
|
||||||
let (input, event_receiver) = sync_channel(10);
|
|
||||||
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
|
||||||
let stage = AccountingStage::new(acc, input);
|
|
||||||
|
|
||||||
// Process a batch that includes a transaction that receives two tokens.
|
|
||||||
let alice = KeyPair::new();
|
|
||||||
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
|
||||||
let events = vec![Event::Transaction(tr)];
|
|
||||||
assert!(stage.process_events(events).is_ok());
|
|
||||||
|
|
||||||
// Process a second batch that spends one of those tokens.
|
|
||||||
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
|
|
||||||
let events = vec![Event::Transaction(tr)];
|
|
||||||
assert!(stage.process_events(events).is_ok());
|
|
||||||
|
|
||||||
// Collect the ledger and feed it to a new accountant.
|
|
||||||
drop(stage.historian_input);
|
|
||||||
let entries: Vec<Entry> = historian.output.lock().unwrap().iter().collect();
|
|
||||||
|
|
||||||
// Assert the user holds one token, not two. If the server only output one
|
|
||||||
// entry, then the second transaction will be rejected, because it drives
|
|
||||||
// the account balance below zero before the credit is added.
|
|
||||||
let acc = Accountant::new(&mint);
|
|
||||||
for entry in entries {
|
|
||||||
assert!(
|
|
||||||
acc.process_verified_events(entry.events)
|
|
||||||
.into_iter()
|
|
||||||
.all(|x| x.is_ok())
|
|
||||||
);
|
|
||||||
}
|
|
||||||
assert_eq!(acc.get_balance(&alice.pubkey()), Some(1));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_accountant_bad_sig() {
|
fn test_accountant_bad_sig() {
|
||||||
let (leader_data, leader_gossip, _, leader_serve, leader_skinny) = test_node();
|
let (leader_data, leader_gossip, _, leader_serve, leader_skinny) = test_node();
|
||||||
|
@ -963,6 +798,7 @@ mod tests {
|
||||||
|
|
||||||
/// Test that mesasge sent from leader to target1 and repliated to target2
|
/// Test that mesasge sent from leader to target1 and repliated to target2
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore]
|
||||||
fn test_replicate() {
|
fn test_replicate() {
|
||||||
logger::setup();
|
logger::setup();
|
||||||
let (leader_data, leader_gossip, _, leader_serve, _) = test_node();
|
let (leader_data, leader_gossip, _, leader_serve, _) = test_node();
|
||||||
|
@ -1121,79 +957,3 @@ mod tests {
|
||||||
assert!(blob_q.len() > num_blobs_ref);
|
assert!(blob_q.len() > num_blobs_ref);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "unstable", test))]
|
|
||||||
mod bench {
|
|
||||||
extern crate test;
|
|
||||||
use self::test::Bencher;
|
|
||||||
use accountant::{Accountant, MAX_ENTRY_IDS};
|
|
||||||
use bincode::serialize;
|
|
||||||
use hash::hash;
|
|
||||||
use mint::Mint;
|
|
||||||
use signature::{KeyPair, KeyPairUtil};
|
|
||||||
use std::collections::HashSet;
|
|
||||||
use std::sync::mpsc::sync_channel;
|
|
||||||
use std::time::Instant;
|
|
||||||
use tpu::*;
|
|
||||||
use transaction::Transaction;
|
|
||||||
|
|
||||||
#[bench]
|
|
||||||
fn process_packets_bench(_bencher: &mut Bencher) {
|
|
||||||
let mint = Mint::new(100_000_000);
|
|
||||||
let acc = Accountant::new(&mint);
|
|
||||||
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
|
|
||||||
// Create transactions between unrelated parties.
|
|
||||||
let txs = 100_000;
|
|
||||||
let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
|
|
||||||
let transactions: Vec<_> = (0..txs)
|
|
||||||
.into_par_iter()
|
|
||||||
.map(|i| {
|
|
||||||
// Seed the 'to' account and a cell for its signature.
|
|
||||||
let dummy_id = i % (MAX_ENTRY_IDS as i32);
|
|
||||||
let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
|
|
||||||
{
|
|
||||||
let mut last_ids = last_ids.lock().unwrap();
|
|
||||||
if !last_ids.contains(&last_id) {
|
|
||||||
last_ids.insert(last_id);
|
|
||||||
acc.register_entry_id(&last_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Seed the 'from' account.
|
|
||||||
let rando0 = KeyPair::new();
|
|
||||||
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
|
|
||||||
acc.process_verified_transaction(&tr).unwrap();
|
|
||||||
|
|
||||||
let rando1 = KeyPair::new();
|
|
||||||
let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
|
|
||||||
acc.process_verified_transaction(&tr).unwrap();
|
|
||||||
|
|
||||||
// Finally, return a transaction that's unique
|
|
||||||
Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let req_vers = transactions
|
|
||||||
.into_iter()
|
|
||||||
.map(|tr| (Request::Transaction(tr), rsp_addr, 1_u8))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let (input, event_receiver) = sync_channel(10);
|
|
||||||
let historian = Historian::new(event_receiver, &mint.last_id(), None);
|
|
||||||
let stage = AccountingStage::new(acc, input);
|
|
||||||
|
|
||||||
let now = Instant::now();
|
|
||||||
assert!(stage.process_events(req_vers).is_ok());
|
|
||||||
let duration = now.elapsed();
|
|
||||||
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
|
|
||||||
let tps = txs as f64 / sec;
|
|
||||||
|
|
||||||
// Ensure that all transactions were successfully logged.
|
|
||||||
drop(stage.historian_input);
|
|
||||||
let entries: Vec<Entry> = historian.output.lock().unwrap().iter().collect();
|
|
||||||
assert_eq!(entries.len(), 1);
|
|
||||||
assert_eq!(entries[0].events.len(), txs as usize);
|
|
||||||
|
|
||||||
println!("{} tps", tps);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue