From 1c923d2f9ec88af7dbbe0e46c19d10f186cd28c7 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 16 Apr 2018 16:38:31 -0400 Subject: [PATCH 01/13] Fix entry hash when no events and num_hashes is one --- src/entry.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/entry.rs b/src/entry.rs index e672e71c04..cd23276353 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -61,7 +61,7 @@ fn add_event_data(hash_data: &mut Vec, event: &Event) { } /// Creates the hash `num_hashes` after `start_hash`. If the event contains -/// signature, the final hash will be a hash of both the previous ID and +/// a signature, the final hash will be a hash of both the previous ID and /// the signature. pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Event]) -> Hash { let mut id = *start_hash; @@ -76,10 +76,12 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Event]) -> Hash { } if !hash_data.is_empty() { - return extend_and_hash(&id, &hash_data); + extend_and_hash(&id, &hash_data) + } else if num_hashes != 0 { + hash(&id) + } else { + id } - - id } /// Creates the next Entry `num_hashes` after `start_hash`. @@ -167,6 +169,8 @@ mod tests { #[test] fn test_next_tick() { let zero = Hash::default(); - assert_eq!(next_tick(&zero, 1).num_hashes, 1) + let tick = next_tick(&zero, 1); + assert_eq!(tick.num_hashes, 1); + assert_ne!(tick.id, zero); } } From 807ccd15ba9e775f66128347b3530039abcd6072 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 16 Apr 2018 16:39:59 -0400 Subject: [PATCH 02/13] Add solana-mint-demo CLI This extends solana-mint with additional data that will be used by both solana-client-demo and creating the demo's genesis block. --- Cargo.toml | 4 ++++ src/bin/mint-demo.rs | 33 +++++++++++++++++++++++++++++++++ src/mint.rs | 6 ++++++ 3 files changed, 43 insertions(+) create mode 100644 src/bin/mint-demo.rs diff --git a/Cargo.toml b/Cargo.toml index 433712ae68..747cd4f38d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,10 @@ path = "src/bin/genesis-demo.rs" name = "solana-mint" path = "src/bin/mint.rs" +[[bin]] +name = "solana-mint-demo" +path = "src/bin/mint-demo.rs" + [badges] codecov = { repository = "solana-labs/solana", branch = "master", service = "github" } diff --git a/src/bin/mint-demo.rs b/src/bin/mint-demo.rs new file mode 100644 index 0000000000..1a2d9485a1 --- /dev/null +++ b/src/bin/mint-demo.rs @@ -0,0 +1,33 @@ +extern crate rayon; +extern crate ring; +extern crate serde_json; +extern crate solana; + +use solana::mint::{Mint, MintDemo}; +use solana::signature::KeyPair; +use std::io; +use rayon::prelude::*; +use ring::rand::SystemRandom; + +fn main() { + let mut input_text = String::new(); + io::stdin().read_line(&mut input_text).unwrap(); + let trimmed = input_text.trim(); + let tokens = trimmed.parse::().unwrap(); + + let mint = Mint::new(tokens); + let tokens_per_user = 1_000; + let num_accounts = tokens / tokens_per_user; + let rnd = SystemRandom::new(); + + let users: Vec<_> = (0..num_accounts) + .into_par_iter() + .map(|_| { + let pkcs8 = KeyPair::generate_pkcs8(&rnd).unwrap().to_vec(); + (pkcs8, tokens_per_user) + }) + .collect(); + + let demo = MintDemo { mint, users }; + println!("{}", serde_json::to_string(&demo).unwrap()); +} diff --git a/src/mint.rs b/src/mint.rs index f3b6c5e598..754cacaa41 100644 --- a/src/mint.rs +++ b/src/mint.rs @@ -58,6 +58,12 @@ impl Mint { } } +#[derive(Serialize, Deserialize, Debug)] +pub struct MintDemo { + pub mint: Mint, + pub users: Vec<(Vec, i64)>, +} + #[cfg(test)] mod tests { use super::*; From 38fdd170674b08212b7506a29c935d174349d8f3 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 16 Apr 2018 16:43:32 -0400 Subject: [PATCH 03/13] Add initializing log message to server Handy when gesesis block is large. --- src/bin/testnode.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 3fa995a718..28e65de792 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -62,8 +62,9 @@ fn main() { exit(1); } + eprintln!("Initializing..."); let mut entries = buffer.lines().map(|line| { - serde_json::from_str(&line).unwrap_or_else(|e| { + serde_json::from_str(&line.unwrap()).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); exit(1); }) @@ -99,8 +100,8 @@ fn main() { stdout(), historian, ))); - eprintln!("Listening on {}", addr); let threads = AccountantSkel::serve(&skel, &addr, exit.clone()).unwrap(); + eprintln!("Ready. Listening on {}", addr); for t in threads { t.join().expect("join"); } From 3215dcff78df923b30cd2231cda6f6c912fd92d6 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 16 Apr 2018 16:44:24 -0400 Subject: [PATCH 04/13] Update readme for new demo Need to create a bunch of unrelated accounts to the genesis block so that transactions can be processed in parallel without waiting on write-locks. And then stuff the private keys of those accounts into mint.json so that the client-demo can send the tokens from those accounts. --- README.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 611be8a836..f476dbe5b4 100644 --- a/README.md +++ b/README.md @@ -39,25 +39,25 @@ $ cd solana The testnode server is initialized with a ledger from stdin and generates new ledger entries on stdout. To create the input ledger, we'll need to create *the mint* and use it to generate a *genesis ledger*. It's done in -two steps because the mint.json file contains a private key that will be +two steps because the mint-demo.json file contains private keys that will be used later in this demo. ```bash - $ echo 1000000000 | cargo run --release --bin solana-mint | tee mint.json - $ cat mint.json | cargo run --release --bin solana-genesis | tee genesis.log + $ echo 1000000000 | cargo run --release --bin solana-mint-demo > mint-demo.json + $ cat mint-demo.json | cargo run --release --bin solana-genesis-demo > genesis.log ``` Now you can start the server: ```bash - $ cat genesis.log | cargo run --release --bin solana-testnode | tee transactions0.log + $ cat genesis.log | cargo run --release --bin solana-testnode > transactions0.log ``` Then, in a separate shell, let's execute some transactions. Note we pass in the JSON configuration file here, not the genesis ledger. ```bash - $ cat mint.json | cargo run --release --bin solana-client-demo + $ cat mint-demo.json | cargo run --release --bin solana-client-demo ``` Now kill the server with Ctrl-C, and take a look at the ledger. You should @@ -73,14 +73,14 @@ Now restart the server from where we left off. Pass it both the genesis ledger, the transaction ledger. ```bash - $ cat genesis.log transactions0.log | cargo run --release --bin solana-testnode | tee transactions1.log + $ cat genesis.log transactions0.log | cargo run --release --bin solana-testnode > transactions1.log ``` Lastly, run the client demo again, and verify that all funds were spent in the previous round, and so no additional transactions are added. ```bash - $ cat mint.json | cargo run --release --bin solana-client-demo + $ cat mint-demo.json | cargo run --release --bin solana-client-demo ``` Stop the server again, and verify there are only Tick entries, and no Transaction entries. From 583f652197149e39920b809178ec323e6d429df5 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 16 Apr 2018 16:48:59 -0400 Subject: [PATCH 05/13] Generate genesis log for the demo This log contains a bunch of transactions that generate new accounts, so that transactions to and from them can be processed in parallel. --- src/bin/genesis-demo.rs | 55 ++++++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/src/bin/genesis-demo.rs b/src/bin/genesis-demo.rs index 340f09c6cf..97ab45e613 100644 --- a/src/bin/genesis-demo.rs +++ b/src/bin/genesis-demo.rs @@ -1,21 +1,23 @@ extern crate isatty; +extern crate rayon; +extern crate ring; extern crate serde_json; extern crate solana; +extern crate untrusted; use isatty::stdin_isatty; -use solana::entry::create_entry; +use solana::entry::{create_entry, next_tick}; use solana::event::Event; -use solana::hash::Hash; -use solana::mint::Mint; -use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; +use solana::accountant::MAX_ENTRY_IDS; +use solana::mint::MintDemo; +use solana::signature::{KeyPair, KeyPairUtil}; use solana::transaction::Transaction; use std::io::{stdin, Read}; use std::process::exit; +use rayon::prelude::*; +use untrusted::Input; -fn transfer(from: &KeyPair, (to, tokens): (PublicKey, i64), last_id: Hash) -> Event { - Event::Transaction(Transaction::new(from, to, tokens, last_id)) -} - +// Generate a ledger with lots and lots of accounts. fn main() { if stdin_isatty() { eprintln!("nothing found on stdin, expected a json file"); @@ -29,20 +31,39 @@ fn main() { exit(1); } - let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| { + let demo: MintDemo = serde_json::from_reader(stdin()).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); exit(1); }); - let mut entries = mint.create_entries(); - let from = mint.keypair(); - let seed = mint.seed(); - let alice = (KeyPair::new().pubkey(), 200); - let bob = (KeyPair::new().pubkey(), 100); - let events = vec![transfer(&from, alice, seed), transfer(&from, bob, seed)]; - entries.push(create_entry(&seed, 0, events)); + let num_accounts = demo.users.len(); + let last_id = demo.mint.last_id(); + let mint_keypair = demo.mint.keypair(); - for entry in entries { + eprintln!("Signing {} transactions...", num_accounts); + let events: Vec<_> = demo.users + .into_par_iter() + .map(|(pkcs8, tokens)| { + let rando = KeyPair::from_pkcs8(Input::from(&pkcs8)).unwrap(); + let tr = Transaction::new(&mint_keypair, rando.pubkey(), tokens, last_id); + Event::Transaction(tr) + }) + .collect(); + + for entry in demo.mint.create_entries() { + println!("{}", serde_json::to_string(&entry).unwrap()); + } + + eprintln!("Logging the creation of {} accounts...", num_accounts); + let entry = create_entry(&last_id, 0, events); + println!("{}", serde_json::to_string(&entry).unwrap()); + + eprintln!("Creating {} empty entries...", MAX_ENTRY_IDS); + // Offer client lots of entry IDs to use for each transaction's last_id. + let mut last_id = last_id; + for _ in 0..MAX_ENTRY_IDS { + let entry = next_tick(&last_id, 1); + last_id = entry.id; let serialized = serde_json::to_string(&entry).unwrap_or_else(|e| { eprintln!("failed to serialize: {}", e); exit(1); From 58860ed19f8a0d8eeeeb517f3f6ecf190eb1edd1 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 16 Apr 2018 16:51:06 -0400 Subject: [PATCH 06/13] WIP: New demo that makes better use of the parallelized accountant --- src/bin/client-demo.rs | 78 ++++++++++++++++++------------------------ 1 file changed, 34 insertions(+), 44 deletions(-) diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index e4ca981597..e552f31b84 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -4,21 +4,23 @@ extern crate isatty; extern crate rayon; extern crate serde_json; extern crate solana; +extern crate untrusted; use futures::Future; use getopts::Options; use isatty::stdin_isatty; use rayon::prelude::*; use solana::accountant_stub::AccountantStub; -use solana::mint::Mint; +use solana::mint::MintDemo; use solana::signature::{KeyPair, KeyPairUtil}; use solana::transaction::Transaction; use std::env; use std::io::{stdin, Read}; use std::net::UdpSocket; use std::process::exit; -use std::thread::sleep; -use std::time::{Duration, Instant}; +use std::time::Instant; +use untrusted::Input; +//use std::sync::mpsc::sync_channel; fn print_usage(program: &str, opts: Options) { let mut brief = format!("Usage: cat | {} [options]\n\n", program); @@ -75,37 +77,33 @@ fn main() { exit(1); } - let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| { + let demo: MintDemo = serde_json::from_reader(stdin()).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); exit(1); }); - let mint_keypair = mint.keypair(); - let mint_pubkey = mint.pubkey(); let socket = UdpSocket::bind(&send_addr).unwrap(); - println!("Stub new"); let acc = AccountantStub::new(&addr, socket); println!("Get last id"); let last_id = acc.get_last_id().wait().unwrap(); - println!("Get Balance"); - let mint_balance = acc.get_balance(&mint_pubkey).wait().unwrap(); - println!("Mint's Initial Balance {}", mint_balance); + let txs = demo.users.len() / 2; + let keypairs: Vec<_> = demo.users + .into_par_iter() + .map(|(pkcs8, _)| KeyPair::from_pkcs8(Input::from(&pkcs8)).unwrap()) + .collect(); + let keypair_pairs: Vec<_> = keypairs.chunks(2).collect(); println!("Signing transactions..."); - let txs = 1_000_000; let now = Instant::now(); - let transactions: Vec<_> = (0..txs) + let transactions: Vec<_> = keypair_pairs .into_par_iter() - .map(|_| { - let rando_pubkey = KeyPair::new().pubkey(); - Transaction::new(&mint_keypair, rando_pubkey, 1, last_id) - }) + .map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id)) .collect(); let duration = now.elapsed(); - let ns = duration.as_secs() * 2_000_000_000 + u64::from(duration.subsec_nanos()); - let bsps = f64::from(txs) / ns as f64; - let nsps = ns as f64 / f64::from(txs); + let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); + let bsps = txs as f64 / ns as f64; + let nsps = ns as f64 / txs as f64; println!( "Done. {} thousand signatures per second, {}us per signature", bsps * 1_000_000_f64, @@ -116,33 +114,25 @@ fn main() { let now = Instant::now(); let sz = transactions.len() / threads; let chunks: Vec<_> = transactions.chunks(sz).collect(); - let _: Vec<_> = chunks - .into_par_iter() - .map(|trs| { - println!("Transferring 1 unit {} times...", trs.len()); - let send_addr = "0.0.0.0:0"; - let socket = UdpSocket::bind(send_addr).unwrap(); - let acc = AccountantStub::new(&addr, socket); - for tr in trs { - acc.transfer_signed(tr.clone()).unwrap(); - } - () - }) - .collect(); - println!("Waiting for last transaction to be confirmed...",); - let mut val = mint_balance; - let mut prev = 0; - while val != prev { - sleep(Duration::from_millis(20)); - prev = val; - val = acc.get_balance(&mint_pubkey).wait().unwrap(); - } - println!("Mint's Final Balance {}", val); - let txs = mint_balance - val; - println!("Successful transactions {}", txs); + chunks.into_par_iter().for_each(|trs| { + println!("Transferring 1 unit {} times...", trs.len()); + let send_addr = "0.0.0.0:0"; + let socket = UdpSocket::bind(send_addr).unwrap(); + //let (entry_info_sender, receiver) = sync_channel(1000); + //let acc = AccountantStub::new_thin_client(&addr, socket, entry_info_sender); + let acc = AccountantStub::new(&addr, socket); + for tr in trs { + acc.transfer_signed(tr.clone()).unwrap(); + } + + println!("Waiting for the server to go idle...",); + //while receiver.recv().unwrap().num_events > 0 {} + }); + + println!("Sent transactions {}", txs); let duration = now.elapsed(); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let tps = (txs * 1_000_000_000) as f64 / ns as f64; - println!("Done. {} tps!", tps); + println!("Done. If no packets dropped, {} tps", tps); } From ea8bfb46ceb261295923c237e301954e7ff669a8 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 17 Apr 2018 17:31:52 -0400 Subject: [PATCH 07/13] Add a way to subscribe for new entry metadata --- src/accountant_skel.rs | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index b4bf805c63..b10b2d7321 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -33,6 +33,7 @@ pub struct AccountantSkel { last_id: Hash, writer: W, historian: Historian, + entry_info_subscribers: Vec, } #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] @@ -41,6 +42,19 @@ pub enum Request { Transaction(Transaction), GetBalance { key: PublicKey }, GetLastId, + Subscribe { subscriptions: Vec }, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Subscription { + EntryInfo, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EntryInfo { + id: Hash, + num_hashes: u64, + num_events: u64, } impl Request { @@ -68,6 +82,22 @@ impl AccountantSkel { last_id, writer, historian, + entry_info_subscribers: vec![], + } + } + + fn notify_entry_info_subscribers(&mut self, entry: &Entry) { + // TODO: No need to bind(). + let socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); + + for addr in &self.entry_info_subscribers { + let entry_info = EntryInfo { + id: entry.id, + num_hashes: entry.num_hashes, + num_events: entry.events.len() as u64, + }; + let data = serialize(&entry_info).expect("serialize EntryInfo"); + let _res = socket.send_to(&data, addr); } } @@ -77,6 +107,7 @@ impl AccountantSkel { self.last_id = entry.id; self.acc.register_entry_id(&self.last_id); writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); + self.notify_entry_info_subscribers(&entry); } self.last_id } @@ -94,6 +125,14 @@ impl AccountantSkel { } Request::GetLastId => Some((Response::LastId { id: self.sync() }, rsp_addr)), Request::Transaction(_) => unreachable!(), + Request::Subscribe { subscriptions } => { + for subscription in subscriptions { + match subscription { + Subscription::EntryInfo => self.entry_info_subscribers.push(rsp_addr), + } + } + None + } } } @@ -242,6 +281,9 @@ impl AccountantSkel { blob_sender.send(blobs)?; } packet_recycler.recycle(msgs); + + // Write new entries to the ledger and notify subscribers. + obj.lock().unwrap().sync(); } Ok(()) } From 6badc9851055846936f75546ed3c2aa16e7e95a6 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 17 Apr 2018 18:14:53 -0400 Subject: [PATCH 08/13] Add low-level response-handling functions to skel --- src/accountant_skel.rs | 10 ++++----- src/accountant_stub.rs | 46 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index b10b2d7321..e4030dff39 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -52,9 +52,9 @@ pub enum Subscription { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct EntryInfo { - id: Hash, - num_hashes: u64, - num_events: u64, + pub id: Hash, + pub num_hashes: u64, + pub num_events: u64, } impl Request { @@ -70,7 +70,7 @@ impl Request { #[derive(Serialize, Deserialize, Debug)] pub enum Response { Balance { key: PublicKey, val: Option }, - Entries { entries: Vec }, + EntryInfo(EntryInfo), LastId { id: Hash }, } @@ -96,7 +96,7 @@ impl AccountantSkel { num_hashes: entry.num_hashes, num_events: entry.events.len() as u64, }; - let data = serialize(&entry_info).expect("serialize EntryInfo"); + let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); let _res = socket.send_to(&data, addr); } } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 5edfa6a053..5fc31d8784 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -3,11 +3,12 @@ //! this object instead of writing messages to the network directly. The binary //! encoding of its messages are unstable and may change in future releases. -use accountant_skel::{Request, Response}; +use accountant_skel::{Request, Response, Subscription}; use bincode::{deserialize, serialize}; use futures::future::{err, ok, FutureResult}; use hash::Hash; use signature::{KeyPair, PublicKey, Signature}; +use std::collections::HashMap; use std::io; use std::net::UdpSocket; use transaction::Transaction; @@ -15,6 +16,9 @@ use transaction::Transaction; pub struct AccountantStub { pub addr: String, pub socket: UdpSocket, + last_id: Option, + num_events: u64, + balances: HashMap, } impl AccountantStub { @@ -25,6 +29,40 @@ impl AccountantStub { AccountantStub { addr: addr.to_string(), socket, + last_id: None, + num_events: 0, + balances: HashMap::new(), + } + } + + pub fn init(&self) { + let subscriptions = vec![Subscription::EntryInfo]; + let req = Request::Subscribe { subscriptions }; + let data = serialize(&req).expect("serialize GetBalance"); + let _res = self.socket.send_to(&data, &self.addr); + } + + pub fn recv_response(&self) -> io::Result { + let mut buf = vec![0u8; 1024]; + self.socket.recv_from(&mut buf)?; + let resp = deserialize(&buf).expect("deserialize balance"); + Ok(resp) + } + + pub fn process_response(&mut self, resp: Response) { + match resp { + Response::Balance { key, val } => { + if let Some(val) = val { + self.balances.insert(key, val); + } + } + Response::LastId { id } => { + self.last_id = Some(id); + } + Response::EntryInfo(entry_info) => { + self.last_id = Some(entry_info.id); + self.num_events += entry_info.num_events; + } } } @@ -89,6 +127,12 @@ impl AccountantStub { } ok(Default::default()) } + + /// Return the number of transactions the server processed since creating + /// this stub instance. + pub fn get_transaction_count(&self) -> u64 { + self.num_events + } } #[cfg(test)] From d5d133353f1b3766e06886f12bc0b95c3665f7a2 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 17 Apr 2018 18:30:41 -0400 Subject: [PATCH 09/13] Port blocking stub functions to new stateful ones --- src/accountant_skel.rs | 2 +- src/accountant_stub.rs | 38 ++++++++++++-------------------------- src/bin/client-demo.rs | 2 +- src/bin/testnode.rs | 2 +- 4 files changed, 15 insertions(+), 29 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index e4030dff39..0178815e78 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -468,7 +468,7 @@ mod tests { let socket = UdpSocket::bind(send_addr).unwrap(); socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); - let acc = AccountantStub::new(&addr, socket); + let mut acc = AccountantStub::new(&addr, socket); let last_id = acc.get_last_id().wait().unwrap(); let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 5fc31d8784..224fb5c01e 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -5,7 +5,7 @@ use accountant_skel::{Request, Response, Subscription}; use bincode::{deserialize, serialize}; -use futures::future::{err, ok, FutureResult}; +use futures::future::{ok, FutureResult}; use hash::Hash; use signature::{KeyPair, PublicKey, Signature}; use std::collections::HashMap; @@ -18,7 +18,7 @@ pub struct AccountantStub { pub socket: UdpSocket, last_id: Option, num_events: u64, - balances: HashMap, + balances: HashMap>, } impl AccountantStub { @@ -52,9 +52,7 @@ impl AccountantStub { pub fn process_response(&mut self, resp: Response) { match resp { Response::Balance { key, val } => { - if let Some(val) = val { - self.balances.insert(key, val); - } + self.balances.insert(key, val); } Response::LastId { id } => { self.last_id = Some(id); @@ -90,42 +88,30 @@ impl AccountantStub { /// Request the balance of the user holding `pubkey`. This method blocks /// until the server sends a response. If the response packet is dropped /// by the network, this method will hang indefinitely. - pub fn get_balance(&self, pubkey: &PublicKey) -> FutureResult { + pub fn get_balance(&mut self, pubkey: &PublicKey) -> FutureResult { let req = Request::GetBalance { key: *pubkey }; let data = serialize(&req).expect("serialize GetBalance"); self.socket .send_to(&data, &self.addr) .expect("buffer error"); - let mut buf = vec![0u8; 1024]; - self.socket.recv_from(&mut buf).expect("buffer error"); - let resp = deserialize(&buf).expect("deserialize balance"); - if let Response::Balance { key, val } = resp { - assert_eq!(key, *pubkey); - return match val { - Some(x) => ok(x), - _ => err(0), - }; - } - err(0) + let resp = self.recv_response().expect("recv response"); + self.process_response(resp); + ok(self.balances[pubkey].unwrap()) } /// Request the last Entry ID from the server. This method blocks /// until the server sends a response. At the time of this writing, /// it also has the side-effect of causing the server to log any /// entries that have been published by the Historian. - pub fn get_last_id(&self) -> FutureResult { + pub fn get_last_id(&mut self) -> FutureResult { let req = Request::GetLastId; let data = serialize(&req).expect("serialize GetId"); self.socket .send_to(&data, &self.addr) .expect("buffer error"); - let mut buf = vec![0u8; 1024]; - self.socket.recv_from(&mut buf).expect("buffer error"); - let resp = deserialize(&buf).expect("deserialize Id"); - if let Response::LastId { id } = resp { - return ok(id); - } - ok(Default::default()) + let resp = self.recv_response().expect("recv response"); + self.process_response(resp); + ok(self.last_id.unwrap_or(Hash::default())) } /// Return the number of transactions the server processed since creating @@ -172,7 +158,7 @@ mod tests { let socket = UdpSocket::bind(send_addr).unwrap(); socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); - let acc = AccountantStub::new(addr, socket); + let mut acc = AccountantStub::new(addr, socket); let last_id = acc.get_last_id().wait().unwrap(); let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index e552f31b84..559c87cdb9 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -83,7 +83,7 @@ fn main() { }); let socket = UdpSocket::bind(&send_addr).unwrap(); - let acc = AccountantStub::new(&addr, socket); + let mut acc = AccountantStub::new(&addr, socket); println!("Get last id"); let last_id = acc.get_last_id().wait().unwrap(); diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 28e65de792..4c34a3a27e 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -64,7 +64,7 @@ fn main() { eprintln!("Initializing..."); let mut entries = buffer.lines().map(|line| { - serde_json::from_str(&line.unwrap()).unwrap_or_else(|e| { + serde_json::from_str(&line).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); exit(1); }) From a15e30d4b3c4918c3d3b78c56bf16993d179cc3c Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 17 Apr 2018 18:41:58 -0400 Subject: [PATCH 10/13] Report transactions processed --- src/accountant_stub.rs | 20 ++++++++++++++------ src/bin/client-demo.rs | 22 +++++++++++++++------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 224fb5c01e..9c7c410497 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -26,19 +26,21 @@ impl AccountantStub { /// over `socket`. To receive responses, the caller must bind `socket` /// to a public address before invoking AccountantStub methods. pub fn new(addr: &str, socket: UdpSocket) -> Self { - AccountantStub { + let stub = AccountantStub { addr: addr.to_string(), socket, last_id: None, num_events: 0, balances: HashMap::new(), - } + }; + stub.init(); + stub } pub fn init(&self) { let subscriptions = vec![Subscription::EntryInfo]; let req = Request::Subscribe { subscriptions }; - let data = serialize(&req).expect("serialize GetBalance"); + let data = serialize(&req).expect("serialize Subscribe"); let _res = self.socket.send_to(&data, &self.addr); } @@ -94,8 +96,14 @@ impl AccountantStub { self.socket .send_to(&data, &self.addr) .expect("buffer error"); - let resp = self.recv_response().expect("recv response"); - self.process_response(resp); + let mut done = false; + while !done { + let resp = self.recv_response().expect("recv response"); + if let &Response::Balance { ref key, .. } = &resp { + done = key == pubkey; + } + self.process_response(resp); + } ok(self.balances[pubkey].unwrap()) } @@ -116,7 +124,7 @@ impl AccountantStub { /// Return the number of transactions the server processed since creating /// this stub instance. - pub fn get_transaction_count(&self) -> u64 { + pub fn transaction_count(&self) -> u64 { self.num_events } } diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 559c87cdb9..831d2521b5 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -18,9 +18,9 @@ use std::env; use std::io::{stdin, Read}; use std::net::UdpSocket; use std::process::exit; -use std::time::Instant; +use std::thread::sleep; +use std::time::{Duration, Instant}; use untrusted::Input; -//use std::sync::mpsc::sync_channel; fn print_usage(program: &str, opts: Options) { let mut brief = format!("Usage: cat | {} [options]\n\n", program); @@ -110,6 +110,8 @@ fn main() { nsps / 1_000_f64 ); + let initial_tx_count = acc.transaction_count(); + println!("Transfering {} transactions in {} batches", txs, threads); let now = Instant::now(); let sz = transactions.len() / threads; @@ -118,17 +120,23 @@ fn main() { println!("Transferring 1 unit {} times...", trs.len()); let send_addr = "0.0.0.0:0"; let socket = UdpSocket::bind(send_addr).unwrap(); - //let (entry_info_sender, receiver) = sync_channel(1000); - //let acc = AccountantStub::new_thin_client(&addr, socket, entry_info_sender); let acc = AccountantStub::new(&addr, socket); for tr in trs { acc.transfer_signed(tr.clone()).unwrap(); } - - println!("Waiting for the server to go idle...",); - //while receiver.recv().unwrap().num_events > 0 {} }); + let mut tx_count = acc.transaction_count(); + println!("tx count {}", tx_count); + let mut prev_tx_count = tx_count + 1; + + println!("Waiting for the server to go idle...",); + while tx_count != prev_tx_count { + sleep(Duration::from_millis(20)); + prev_tx_count = tx_count; + tx_count = acc.transaction_count(); + } + let txs = tx_count - initial_tx_count; println!("Sent transactions {}", txs); let duration = now.elapsed(); From b60a98bd6e8ad1703ccf7e3bfeea8ef5abf078bc Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 20 Apr 2018 23:28:55 -0600 Subject: [PATCH 11/13] Startup log can reference IDs without itself --- src/accountant_stub.rs | 20 +++++++++++++++++--- src/bin/client-demo.rs | 8 +++++--- src/bin/testnode.rs | 5 ++++- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 9c7c410497..e23c1c3851 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -117,14 +117,28 @@ impl AccountantStub { self.socket .send_to(&data, &self.addr) .expect("buffer error"); - let resp = self.recv_response().expect("recv response"); - self.process_response(resp); + let mut done = false; + while !done { + let resp = self.recv_response().expect("recv response"); + if let &Response::LastId { .. } = &resp { + done = true; + } + self.process_response(resp); + } ok(self.last_id.unwrap_or(Hash::default())) } /// Return the number of transactions the server processed since creating /// this stub instance. - pub fn transaction_count(&self) -> u64 { + pub fn transaction_count(&mut self) -> u64 { + self.socket.set_nonblocking(true).expect("set nonblocking"); + loop { + match self.recv_response() { + Err(_) => break, + Ok(resp) => self.process_response(resp), + } + } + self.socket.set_nonblocking(false).expect("set blocking"); self.num_events } } diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 831d2521b5..57bf01b297 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -77,6 +77,7 @@ fn main() { exit(1); } + println!("Parsing stdin..."); let demo: MintDemo = serde_json::from_reader(stdin()).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); exit(1); @@ -84,9 +85,11 @@ fn main() { let socket = UdpSocket::bind(&send_addr).unwrap(); let mut acc = AccountantStub::new(&addr, socket); - println!("Get last id"); + + println!("Get last ID..."); let last_id = acc.get_last_id().wait().unwrap(); + println!("Creating keypairs..."); let txs = demo.users.len() / 2; let keypairs: Vec<_> = demo.users .into_par_iter() @@ -127,7 +130,6 @@ fn main() { }); let mut tx_count = acc.transaction_count(); - println!("tx count {}", tx_count); let mut prev_tx_count = tx_count + 1; println!("Waiting for the server to go idle...",); @@ -142,5 +144,5 @@ fn main() { let duration = now.elapsed(); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let tps = (txs * 1_000_000_000) as f64 / ns as f64; - println!("Done. If no packets dropped, {} tps", tps); + println!("Done. {} tps", tps); } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 4c34a3a27e..8b6da2c1c8 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -72,7 +72,7 @@ fn main() { // The first item in the ledger is required to be an entry with zero num_hashes, // which implies its id can be used as the ledger's seed. - entries.next().unwrap(); + let entry0 = entries.next().unwrap(); // The second item in the ledger is a special transaction where the to and from // fields are the same. That entry should be treated as a deposit, not a @@ -85,11 +85,14 @@ fn main() { }; let acc = Accountant::new_from_deposit(&deposit.unwrap()); + acc.register_entry_id(&entry0.id); + acc.register_entry_id(&entry1.id); let mut last_id = entry1.id; for entry in entries { last_id = entry.id; acc.process_verified_events(entry.events).unwrap(); + acc.register_entry_id(&last_id); } let historian = Historian::new(&last_id, Some(1000)); From 9ed953e8c372b92f36cf8d3eaaba064fee4ccb10 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 26 Apr 2018 09:35:10 -0600 Subject: [PATCH 12/13] Fix rebase fails --- src/bin/client-demo.rs | 2 +- src/bin/genesis-demo.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 57bf01b297..18a1a7f660 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -78,7 +78,7 @@ fn main() { } println!("Parsing stdin..."); - let demo: MintDemo = serde_json::from_reader(stdin()).unwrap_or_else(|e| { + let demo: MintDemo = serde_json::from_str(&buffer).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); exit(1); }); diff --git a/src/bin/genesis-demo.rs b/src/bin/genesis-demo.rs index 97ab45e613..522a74abad 100644 --- a/src/bin/genesis-demo.rs +++ b/src/bin/genesis-demo.rs @@ -31,7 +31,7 @@ fn main() { exit(1); } - let demo: MintDemo = serde_json::from_reader(stdin()).unwrap_or_else(|e| { + let demo: MintDemo = serde_json::from_str(&buffer).unwrap_or_else(|e| { eprintln!("failed to parse json: {}", e); exit(1); }); From d415b171460d1d1a348c3e0b15b502441928c97a Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 26 Apr 2018 13:17:36 -0600 Subject: [PATCH 13/13] sleepless demo to complement sleepless nights 18 ktps on macbook pro, no gpu --- README.md | 3 +++ src/accountant_skel.rs | 10 ++++++++-- src/accountant_stub.rs | 11 +++++++++++ src/bin/client-demo.rs | 13 ++++--------- src/bin/genesis-demo.rs | 4 ++-- src/bin/mint-demo.rs | 4 ++-- 6 files changed, 30 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index f476dbe5b4..729d99ac72 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,9 @@ Now you can start the server: $ cat genesis.log | cargo run --release --bin solana-testnode > transactions0.log ``` +Wait a few seconds for the server to initialize. It will print "Ready." when it's safe +to start sending it transactions. + Then, in a separate shell, let's execute some transactions. Note we pass in the JSON configuration file here, not the genesis ledger. diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 0178815e78..c3811b4214 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -285,6 +285,7 @@ impl AccountantSkel { // Write new entries to the ledger and notify subscribers. obj.lock().unwrap().sync(); } + Ok(()) } @@ -328,8 +329,13 @@ impl AccountantSkel { &packet_recycler, &blob_recycler, ); - if e.is_err() && exit.load(Ordering::Relaxed) { - break; + if e.is_err() { + // Assume this was a timeout, so sync any empty entries. + skel.lock().unwrap().sync(); + + if exit.load(Ordering::Relaxed) { + break; + } } }); Ok(vec![t_receiver, t_responder, t_server, t_verifier]) diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index e23c1c3851..2dd331da7b 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -131,6 +131,17 @@ impl AccountantStub { /// Return the number of transactions the server processed since creating /// this stub instance. pub fn transaction_count(&mut self) -> u64 { + // Wait for at least one EntryInfo. + let mut done = false; + while !done { + let resp = self.recv_response().expect("recv response"); + if let &Response::EntryInfo(_) = &resp { + done = true; + } + self.process_response(resp); + } + + // Then take the rest. self.socket.set_nonblocking(true).expect("set nonblocking"); loop { match self.recv_response() { diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 18a1a7f660..50f2e8a2ec 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -18,8 +18,7 @@ use std::env; use std::io::{stdin, Read}; use std::net::UdpSocket; use std::process::exit; -use std::thread::sleep; -use std::time::{Duration, Instant}; +use std::time::Instant; use untrusted::Input; fn print_usage(program: &str, opts: Options) { @@ -129,17 +128,13 @@ fn main() { } }); + println!("Waiting for half the transactions to complete...",); let mut tx_count = acc.transaction_count(); - let mut prev_tx_count = tx_count + 1; - - println!("Waiting for the server to go idle...",); - while tx_count != prev_tx_count { - sleep(Duration::from_millis(20)); - prev_tx_count = tx_count; + while tx_count < transactions.len() as u64 / 2 { tx_count = acc.transaction_count(); } let txs = tx_count - initial_tx_count; - println!("Sent transactions {}", txs); + println!("Transactions processed {}", txs); let duration = now.elapsed(); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); diff --git a/src/bin/genesis-demo.rs b/src/bin/genesis-demo.rs index 522a74abad..68c10b9928 100644 --- a/src/bin/genesis-demo.rs +++ b/src/bin/genesis-demo.rs @@ -6,15 +6,15 @@ extern crate solana; extern crate untrusted; use isatty::stdin_isatty; +use rayon::prelude::*; +use solana::accountant::MAX_ENTRY_IDS; use solana::entry::{create_entry, next_tick}; use solana::event::Event; -use solana::accountant::MAX_ENTRY_IDS; use solana::mint::MintDemo; use solana::signature::{KeyPair, KeyPairUtil}; use solana::transaction::Transaction; use std::io::{stdin, Read}; use std::process::exit; -use rayon::prelude::*; use untrusted::Input; // Generate a ledger with lots and lots of accounts. diff --git a/src/bin/mint-demo.rs b/src/bin/mint-demo.rs index 1a2d9485a1..1127ce19b5 100644 --- a/src/bin/mint-demo.rs +++ b/src/bin/mint-demo.rs @@ -3,11 +3,11 @@ extern crate ring; extern crate serde_json; extern crate solana; +use rayon::prelude::*; +use ring::rand::SystemRandom; use solana::mint::{Mint, MintDemo}; use solana::signature::KeyPair; use std::io; -use rayon::prelude::*; -use ring::rand::SystemRandom; fn main() { let mut input_text = String::new();