Merge pull request #128 from garious/faster-demo

Utilize parallelized accountant in demo
This commit is contained in:
Greg Fitzgerald 2018-04-27 08:47:42 -06:00 committed by GitHub
commit 7077f4cbe2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 283 additions and 102 deletions

View File

@ -36,6 +36,10 @@ path = "src/bin/genesis-demo.rs"
name = "solana-mint"
path = "src/bin/mint.rs"
[[bin]]
name = "solana-mint-demo"
path = "src/bin/mint-demo.rs"
[badges]
codecov = { repository = "solana-labs/solana", branch = "master", service = "github" }

View File

@ -39,25 +39,28 @@ $ cd solana
The testnode server is initialized with a ledger from stdin and
generates new ledger entries on stdout. To create the input ledger, we'll need
to create *the mint* and use it to generate a *genesis ledger*. It's done in
two steps because the mint.json file contains a private key that will be
two steps because the mint-demo.json file contains private keys that will be
used later in this demo.
```bash
$ echo 1000000000 | cargo run --release --bin solana-mint | tee mint.json
$ cat mint.json | cargo run --release --bin solana-genesis | tee genesis.log
$ echo 1000000000 | cargo run --release --bin solana-mint-demo > mint-demo.json
$ cat mint-demo.json | cargo run --release --bin solana-genesis-demo > genesis.log
```
Now you can start the server:
```bash
$ cat genesis.log | cargo run --release --bin solana-testnode | tee transactions0.log
$ cat genesis.log | cargo run --release --bin solana-testnode > transactions0.log
```
Wait a few seconds for the server to initialize. It will print "Ready." when it's safe
to start sending it transactions.
Then, in a separate shell, let's execute some transactions. Note we pass in
the JSON configuration file here, not the genesis ledger.
```bash
$ cat mint.json | cargo run --release --bin solana-client-demo
$ cat mint-demo.json | cargo run --release --bin solana-client-demo
```
Now kill the server with Ctrl-C, and take a look at the ledger. You should
@ -73,14 +76,14 @@ Now restart the server from where we left off. Pass it both the genesis ledger,
the transaction ledger.
```bash
$ cat genesis.log transactions0.log | cargo run --release --bin solana-testnode | tee transactions1.log
$ cat genesis.log transactions0.log | cargo run --release --bin solana-testnode > transactions1.log
```
Lastly, run the client demo again, and verify that all funds were spent in the
previous round, and so no additional transactions are added.
```bash
$ cat mint.json | cargo run --release --bin solana-client-demo
$ cat mint-demo.json | cargo run --release --bin solana-client-demo
```
Stop the server again, and verify there are only Tick entries, and no Transaction entries.

View File

@ -33,6 +33,7 @@ pub struct AccountantSkel<W: Write + Send + 'static> {
last_id: Hash,
writer: W,
historian: Historian,
entry_info_subscribers: Vec<SocketAddr>,
}
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
@ -41,6 +42,19 @@ pub enum Request {
Transaction(Transaction),
GetBalance { key: PublicKey },
GetLastId,
Subscribe { subscriptions: Vec<Subscription> },
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Subscription {
EntryInfo,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EntryInfo {
pub id: Hash,
pub num_hashes: u64,
pub num_events: u64,
}
impl Request {
@ -56,7 +70,7 @@ impl Request {
#[derive(Serialize, Deserialize, Debug)]
pub enum Response {
Balance { key: PublicKey, val: Option<i64> },
Entries { entries: Vec<Entry> },
EntryInfo(EntryInfo),
LastId { id: Hash },
}
@ -68,6 +82,22 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
last_id,
writer,
historian,
entry_info_subscribers: vec![],
}
}
fn notify_entry_info_subscribers(&mut self, entry: &Entry) {
// TODO: No need to bind().
let socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
for addr in &self.entry_info_subscribers {
let entry_info = EntryInfo {
id: entry.id,
num_hashes: entry.num_hashes,
num_events: entry.events.len() as u64,
};
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
let _res = socket.send_to(&data, addr);
}
}
@ -77,6 +107,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
self.last_id = entry.id;
self.acc.register_entry_id(&self.last_id);
writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
self.notify_entry_info_subscribers(&entry);
}
self.last_id
}
@ -94,6 +125,14 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}
Request::GetLastId => Some((Response::LastId { id: self.sync() }, rsp_addr)),
Request::Transaction(_) => unreachable!(),
Request::Subscribe { subscriptions } => {
for subscription in subscriptions {
match subscription {
Subscription::EntryInfo => self.entry_info_subscribers.push(rsp_addr),
}
}
None
}
}
}
@ -242,7 +281,11 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
blob_sender.send(blobs)?;
}
packet_recycler.recycle(msgs);
// Write new entries to the ledger and notify subscribers.
obj.lock().unwrap().sync();
}
Ok(())
}
@ -286,9 +329,14 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
&packet_recycler,
&blob_recycler,
);
if e.is_err() && exit.load(Ordering::Relaxed) {
if e.is_err() {
// Assume this was a timeout, so sync any empty entries.
skel.lock().unwrap().sync();
if exit.load(Ordering::Relaxed) {
break;
}
}
});
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
}
@ -426,7 +474,7 @@ mod tests {
let socket = UdpSocket::bind(send_addr).unwrap();
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
let acc = AccountantStub::new(&addr, socket);
let mut acc = AccountantStub::new(&addr, socket);
let last_id = acc.get_last_id().wait().unwrap();
let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id);

View File

@ -3,11 +3,12 @@
//! this object instead of writing messages to the network directly. The binary
//! encoding of its messages are unstable and may change in future releases.
use accountant_skel::{Request, Response};
use accountant_skel::{Request, Response, Subscription};
use bincode::{deserialize, serialize};
use futures::future::{err, ok, FutureResult};
use futures::future::{ok, FutureResult};
use hash::Hash;
use signature::{KeyPair, PublicKey, Signature};
use std::collections::HashMap;
use std::io;
use std::net::UdpSocket;
use transaction::Transaction;
@ -15,6 +16,9 @@ use transaction::Transaction;
pub struct AccountantStub {
pub addr: String,
pub socket: UdpSocket,
last_id: Option<Hash>,
num_events: u64,
balances: HashMap<PublicKey, Option<i64>>,
}
impl AccountantStub {
@ -22,9 +26,43 @@ impl AccountantStub {
/// over `socket`. To receive responses, the caller must bind `socket`
/// to a public address before invoking AccountantStub methods.
pub fn new(addr: &str, socket: UdpSocket) -> Self {
AccountantStub {
let stub = AccountantStub {
addr: addr.to_string(),
socket,
last_id: None,
num_events: 0,
balances: HashMap::new(),
};
stub.init();
stub
}
pub fn init(&self) {
let subscriptions = vec![Subscription::EntryInfo];
let req = Request::Subscribe { subscriptions };
let data = serialize(&req).expect("serialize Subscribe");
let _res = self.socket.send_to(&data, &self.addr);
}
pub fn recv_response(&self) -> io::Result<Response> {
let mut buf = vec![0u8; 1024];
self.socket.recv_from(&mut buf)?;
let resp = deserialize(&buf).expect("deserialize balance");
Ok(resp)
}
pub fn process_response(&mut self, resp: Response) {
match resp {
Response::Balance { key, val } => {
self.balances.insert(key, val);
}
Response::LastId { id } => {
self.last_id = Some(id);
}
Response::EntryInfo(entry_info) => {
self.last_id = Some(entry_info.id);
self.num_events += entry_info.num_events;
}
}
}
@ -52,42 +90,67 @@ impl AccountantStub {
/// Request the balance of the user holding `pubkey`. This method blocks
/// until the server sends a response. If the response packet is dropped
/// by the network, this method will hang indefinitely.
pub fn get_balance(&self, pubkey: &PublicKey) -> FutureResult<i64, i64> {
pub fn get_balance(&mut self, pubkey: &PublicKey) -> FutureResult<i64, i64> {
let req = Request::GetBalance { key: *pubkey };
let data = serialize(&req).expect("serialize GetBalance");
self.socket
.send_to(&data, &self.addr)
.expect("buffer error");
let mut buf = vec![0u8; 1024];
self.socket.recv_from(&mut buf).expect("buffer error");
let resp = deserialize(&buf).expect("deserialize balance");
if let Response::Balance { key, val } = resp {
assert_eq!(key, *pubkey);
return match val {
Some(x) => ok(x),
_ => err(0),
};
let mut done = false;
while !done {
let resp = self.recv_response().expect("recv response");
if let &Response::Balance { ref key, .. } = &resp {
done = key == pubkey;
}
err(0)
self.process_response(resp);
}
ok(self.balances[pubkey].unwrap())
}
/// Request the last Entry ID from the server. This method blocks
/// until the server sends a response. At the time of this writing,
/// it also has the side-effect of causing the server to log any
/// entries that have been published by the Historian.
pub fn get_last_id(&self) -> FutureResult<Hash, ()> {
pub fn get_last_id(&mut self) -> FutureResult<Hash, ()> {
let req = Request::GetLastId;
let data = serialize(&req).expect("serialize GetId");
self.socket
.send_to(&data, &self.addr)
.expect("buffer error");
let mut buf = vec![0u8; 1024];
self.socket.recv_from(&mut buf).expect("buffer error");
let resp = deserialize(&buf).expect("deserialize Id");
if let Response::LastId { id } = resp {
return ok(id);
let mut done = false;
while !done {
let resp = self.recv_response().expect("recv response");
if let &Response::LastId { .. } = &resp {
done = true;
}
ok(Default::default())
self.process_response(resp);
}
ok(self.last_id.unwrap_or(Hash::default()))
}
/// Return the number of transactions the server processed since creating
/// this stub instance.
pub fn transaction_count(&mut self) -> u64 {
// Wait for at least one EntryInfo.
let mut done = false;
while !done {
let resp = self.recv_response().expect("recv response");
if let &Response::EntryInfo(_) = &resp {
done = true;
}
self.process_response(resp);
}
// Then take the rest.
self.socket.set_nonblocking(true).expect("set nonblocking");
loop {
match self.recv_response() {
Err(_) => break,
Ok(resp) => self.process_response(resp),
}
}
self.socket.set_nonblocking(false).expect("set blocking");
self.num_events
}
}
@ -128,7 +191,7 @@ mod tests {
let socket = UdpSocket::bind(send_addr).unwrap();
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
let acc = AccountantStub::new(addr, socket);
let mut acc = AccountantStub::new(addr, socket);
let last_id = acc.get_last_id().wait().unwrap();
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
.unwrap();

View File

@ -4,21 +4,22 @@ extern crate isatty;
extern crate rayon;
extern crate serde_json;
extern crate solana;
extern crate untrusted;
use futures::Future;
use getopts::Options;
use isatty::stdin_isatty;
use rayon::prelude::*;
use solana::accountant_stub::AccountantStub;
use solana::mint::Mint;
use solana::mint::MintDemo;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::transaction::Transaction;
use std::env;
use std::io::{stdin, Read};
use std::net::UdpSocket;
use std::process::exit;
use std::thread::sleep;
use std::time::{Duration, Instant};
use std::time::Instant;
use untrusted::Input;
fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: cat <mint.json> | {} [options]\n\n", program);
@ -75,50 +76,49 @@ fn main() {
exit(1);
}
let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| {
println!("Parsing stdin...");
let demo: MintDemo = serde_json::from_str(&buffer).unwrap_or_else(|e| {
eprintln!("failed to parse json: {}", e);
exit(1);
});
let mint_keypair = mint.keypair();
let mint_pubkey = mint.pubkey();
let socket = UdpSocket::bind(&send_addr).unwrap();
println!("Stub new");
let acc = AccountantStub::new(&addr, socket);
println!("Get last id");
let mut acc = AccountantStub::new(&addr, socket);
println!("Get last ID...");
let last_id = acc.get_last_id().wait().unwrap();
println!("Get Balance");
let mint_balance = acc.get_balance(&mint_pubkey).wait().unwrap();
println!("Mint's Initial Balance {}", mint_balance);
println!("Creating keypairs...");
let txs = demo.users.len() / 2;
let keypairs: Vec<_> = demo.users
.into_par_iter()
.map(|(pkcs8, _)| KeyPair::from_pkcs8(Input::from(&pkcs8)).unwrap())
.collect();
let keypair_pairs: Vec<_> = keypairs.chunks(2).collect();
println!("Signing transactions...");
let txs = 1_000_000;
let now = Instant::now();
let transactions: Vec<_> = (0..txs)
let transactions: Vec<_> = keypair_pairs
.into_par_iter()
.map(|_| {
let rando_pubkey = KeyPair::new().pubkey();
Transaction::new(&mint_keypair, rando_pubkey, 1, last_id)
})
.map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id))
.collect();
let duration = now.elapsed();
let ns = duration.as_secs() * 2_000_000_000 + u64::from(duration.subsec_nanos());
let bsps = f64::from(txs) / ns as f64;
let nsps = ns as f64 / f64::from(txs);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let bsps = txs as f64 / ns as f64;
let nsps = ns as f64 / txs as f64;
println!(
"Done. {} thousand signatures per second, {}us per signature",
bsps * 1_000_000_f64,
nsps / 1_000_f64
);
let initial_tx_count = acc.transaction_count();
println!("Transfering {} transactions in {} batches", txs, threads);
let now = Instant::now();
let sz = transactions.len() / threads;
let chunks: Vec<_> = transactions.chunks(sz).collect();
let _: Vec<_> = chunks
.into_par_iter()
.map(|trs| {
chunks.into_par_iter().for_each(|trs| {
println!("Transferring 1 unit {} times...", trs.len());
let send_addr = "0.0.0.0:0";
let socket = UdpSocket::bind(send_addr).unwrap();
@ -126,23 +126,18 @@ fn main() {
for tr in trs {
acc.transfer_signed(tr.clone()).unwrap();
}
()
})
.collect();
println!("Waiting for last transaction to be confirmed...",);
let mut val = mint_balance;
let mut prev = 0;
while val != prev {
sleep(Duration::from_millis(20));
prev = val;
val = acc.get_balance(&mint_pubkey).wait().unwrap();
});
println!("Waiting for half the transactions to complete...",);
let mut tx_count = acc.transaction_count();
while tx_count < transactions.len() as u64 / 2 {
tx_count = acc.transaction_count();
}
println!("Mint's Final Balance {}", val);
let txs = mint_balance - val;
println!("Successful transactions {}", txs);
let txs = tx_count - initial_tx_count;
println!("Transactions processed {}", txs);
let duration = now.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (txs * 1_000_000_000) as f64 / ns as f64;
println!("Done. {} tps!", tps);
println!("Done. {} tps", tps);
}

View File

@ -1,21 +1,23 @@
extern crate isatty;
extern crate rayon;
extern crate ring;
extern crate serde_json;
extern crate solana;
extern crate untrusted;
use isatty::stdin_isatty;
use solana::entry::create_entry;
use rayon::prelude::*;
use solana::accountant::MAX_ENTRY_IDS;
use solana::entry::{create_entry, next_tick};
use solana::event::Event;
use solana::hash::Hash;
use solana::mint::Mint;
use solana::signature::{KeyPair, KeyPairUtil, PublicKey};
use solana::mint::MintDemo;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::transaction::Transaction;
use std::io::{stdin, Read};
use std::process::exit;
use untrusted::Input;
fn transfer(from: &KeyPair, (to, tokens): (PublicKey, i64), last_id: Hash) -> Event {
Event::Transaction(Transaction::new(from, to, tokens, last_id))
}
// Generate a ledger with lots and lots of accounts.
fn main() {
if stdin_isatty() {
eprintln!("nothing found on stdin, expected a json file");
@ -29,20 +31,39 @@ fn main() {
exit(1);
}
let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| {
let demo: MintDemo = serde_json::from_str(&buffer).unwrap_or_else(|e| {
eprintln!("failed to parse json: {}", e);
exit(1);
});
let mut entries = mint.create_entries();
let from = mint.keypair();
let seed = mint.seed();
let alice = (KeyPair::new().pubkey(), 200);
let bob = (KeyPair::new().pubkey(), 100);
let events = vec![transfer(&from, alice, seed), transfer(&from, bob, seed)];
entries.push(create_entry(&seed, 0, events));
let num_accounts = demo.users.len();
let last_id = demo.mint.last_id();
let mint_keypair = demo.mint.keypair();
for entry in entries {
eprintln!("Signing {} transactions...", num_accounts);
let events: Vec<_> = demo.users
.into_par_iter()
.map(|(pkcs8, tokens)| {
let rando = KeyPair::from_pkcs8(Input::from(&pkcs8)).unwrap();
let tr = Transaction::new(&mint_keypair, rando.pubkey(), tokens, last_id);
Event::Transaction(tr)
})
.collect();
for entry in demo.mint.create_entries() {
println!("{}", serde_json::to_string(&entry).unwrap());
}
eprintln!("Logging the creation of {} accounts...", num_accounts);
let entry = create_entry(&last_id, 0, events);
println!("{}", serde_json::to_string(&entry).unwrap());
eprintln!("Creating {} empty entries...", MAX_ENTRY_IDS);
// Offer client lots of entry IDs to use for each transaction's last_id.
let mut last_id = last_id;
for _ in 0..MAX_ENTRY_IDS {
let entry = next_tick(&last_id, 1);
last_id = entry.id;
let serialized = serde_json::to_string(&entry).unwrap_or_else(|e| {
eprintln!("failed to serialize: {}", e);
exit(1);

33
src/bin/mint-demo.rs Normal file
View File

@ -0,0 +1,33 @@
extern crate rayon;
extern crate ring;
extern crate serde_json;
extern crate solana;
use rayon::prelude::*;
use ring::rand::SystemRandom;
use solana::mint::{Mint, MintDemo};
use solana::signature::KeyPair;
use std::io;
fn main() {
let mut input_text = String::new();
io::stdin().read_line(&mut input_text).unwrap();
let trimmed = input_text.trim();
let tokens = trimmed.parse::<i64>().unwrap();
let mint = Mint::new(tokens);
let tokens_per_user = 1_000;
let num_accounts = tokens / tokens_per_user;
let rnd = SystemRandom::new();
let users: Vec<_> = (0..num_accounts)
.into_par_iter()
.map(|_| {
let pkcs8 = KeyPair::generate_pkcs8(&rnd).unwrap().to_vec();
(pkcs8, tokens_per_user)
})
.collect();
let demo = MintDemo { mint, users };
println!("{}", serde_json::to_string(&demo).unwrap());
}

View File

@ -62,6 +62,7 @@ fn main() {
exit(1);
}
eprintln!("Initializing...");
let mut entries = buffer.lines().map(|line| {
serde_json::from_str(&line).unwrap_or_else(|e| {
eprintln!("failed to parse json: {}", e);
@ -71,7 +72,7 @@ fn main() {
// The first item in the ledger is required to be an entry with zero num_hashes,
// which implies its id can be used as the ledger's seed.
entries.next().unwrap();
let entry0 = entries.next().unwrap();
// The second item in the ledger is a special transaction where the to and from
// fields are the same. That entry should be treated as a deposit, not a
@ -84,11 +85,14 @@ fn main() {
};
let acc = Accountant::new_from_deposit(&deposit.unwrap());
acc.register_entry_id(&entry0.id);
acc.register_entry_id(&entry1.id);
let mut last_id = entry1.id;
for entry in entries {
last_id = entry.id;
acc.process_verified_events(entry.events).unwrap();
acc.register_entry_id(&last_id);
}
let historian = Historian::new(&last_id, Some(1000));
@ -99,8 +103,8 @@ fn main() {
stdout(),
historian,
)));
eprintln!("Listening on {}", addr);
let threads = AccountantSkel::serve(&skel, &addr, exit.clone()).unwrap();
eprintln!("Ready. Listening on {}", addr);
for t in threads {
t.join().expect("join");
}

View File

@ -61,7 +61,7 @@ fn add_event_data(hash_data: &mut Vec<u8>, event: &Event) {
}
/// Creates the hash `num_hashes` after `start_hash`. If the event contains
/// signature, the final hash will be a hash of both the previous ID and
/// a signature, the final hash will be a hash of both the previous ID and
/// the signature.
pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Event]) -> Hash {
let mut id = *start_hash;
@ -76,10 +76,12 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Event]) -> Hash {
}
if !hash_data.is_empty() {
return extend_and_hash(&id, &hash_data);
}
extend_and_hash(&id, &hash_data)
} else if num_hashes != 0 {
hash(&id)
} else {
id
}
}
/// Creates the next Entry `num_hashes` after `start_hash`.
@ -167,6 +169,8 @@ mod tests {
#[test]
fn test_next_tick() {
let zero = Hash::default();
assert_eq!(next_tick(&zero, 1).num_hashes, 1)
let tick = next_tick(&zero, 1);
assert_eq!(tick.num_hashes, 1);
assert_ne!(tick.id, zero);
}
}

View File

@ -58,6 +58,12 @@ impl Mint {
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MintDemo {
pub mint: Mint,
pub users: Vec<(Vec<u8>, i64)>,
}
#[cfg(test)]
mod tests {
use super::*;