diff --git a/benches/bank.rs b/benches/bank.rs index 8c4cde215a..0211f58170 100644 --- a/benches/bank.rs +++ b/benches/bank.rs @@ -28,17 +28,14 @@ fn bench_process_transaction(bencher: &mut Bencher) { &mint.keypair(), rando0.pubkey(), 10_000, - mint.last_id(), + bank.last_id(), 0, ); assert_eq!(bank.process_transaction(&tx), Ok(())); // Seed the 'to' account and a cell for its signature. - let last_id = hash(&serialize(&i).unwrap()); // Unique hash - bank.register_entry_id(&last_id); - let rando1 = Keypair::new(); - let tx = Transaction::system_move(&rando0, rando1.pubkey(), 1, last_id, 0); + let tx = Transaction::system_move(&rando0, rando1.pubkey(), 1, bank.last_id(), 0); assert_eq!(bank.process_transaction(&tx), Ok(())); // Finally, return the transaction to the benchmark. diff --git a/src/bank.rs b/src/bank.rs index 9be12cf325..44d1275c75 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -47,7 +47,8 @@ use window::WINDOW_SIZE; /// but requires clients to update its `last_id` more frequently. Raising the value /// lengthens the time a client must wait to be certain a missing transaction will /// not be processed by the network. -pub const MAX_ENTRY_IDS: usize = 1024 * 32; +pub const NUM_TICKS_PER_SECOND: usize = 10; +pub const MAX_ENTRY_IDS: usize = NUM_TICKS_PER_SECOND * 120; pub const VERIFY_BLOCK_SIZE: usize = 16; @@ -106,9 +107,12 @@ type SignatureStatusMap = HashMap>; #[derive(Default)] struct ErrorCounters { - account_not_found_validator: usize, - account_not_found_leader: usize, + account_not_found: usize, account_in_use: usize, + last_id_not_found: usize, + reserve_last_id: usize, + insufficient_funds: usize, + duplicate_signature: usize, } /// The state of all accounts and contracts after processing its entries. pub struct Bank { @@ -226,6 +230,25 @@ impl Bank { } } + /// Return the position of the last_id in the last_id_queue starting from the back + /// If the last_id is not found last_id_queue.len() is returned + fn compute_entry_id_age(last_id_queue: &VecDeque, entry_id: Hash) -> Option { + for (i, id) in last_id_queue.iter().rev().enumerate() { + if *id == entry_id { + return Some(i); + } + } + None + } + /// Check if the age of the entry_id is within the max_age + /// return false for any entries with an age equal to or above max_age + fn check_entry_id_age(last_id_queue: &VecDeque, entry_id: Hash, max_age: usize) -> bool { + match Self::compute_entry_id_age(last_id_queue, entry_id) { + Some(age) if age < max_age => true, + _ => false, + } + } + fn reserve_signature_with_last_id( last_ids_sigs: &mut HashMap, last_id: &Hash, @@ -308,6 +331,7 @@ impl Bank { /// the oldest ones once its internal cache is full. Once boot, the /// bank will reject transactions using that `last_id`. pub fn register_entry_id(&self, last_id: &Hash) { + // this must be locked first! let mut last_ids = self .last_ids .write() @@ -318,8 +342,10 @@ impl Bank { .expect("last_ids_sigs write lock"); if last_ids.len() >= MAX_ENTRY_IDS { let id = last_ids.pop_front().unwrap(); + info!("removing last_id {}", id); last_ids_sigs.remove(&id); } + inc_new_counter_info!("bank-register_entry_id-registered", 1); last_ids_sigs.insert(*last_id, (HashMap::new(), timestamp())); last_ids.push_back(*last_id); } @@ -367,29 +393,38 @@ impl Bank { tx: &Transaction, accounts: &HashMap, last_ids_sigs: &mut HashMap, + last_ids: &VecDeque, + max_age: usize, error_counters: &mut ErrorCounters, ) -> Result> { // Copy all the accounts if accounts.get(&tx.account_keys[0]).is_none() { - if !self.is_leader { - error_counters.account_not_found_validator += 1; - } else { - error_counters.account_not_found_leader += 1; - } + error_counters.account_not_found += 1; Err(BankError::AccountNotFound) } else if accounts.get(&tx.account_keys[0]).unwrap().tokens < tx.fee { + error_counters.insufficient_funds += 1; Err(BankError::InsufficientFundsForFee) } else { + if !Self::check_entry_id_age(last_ids, tx.last_id, max_age) { + error_counters.last_id_not_found += 1; + return Err(BankError::LastIdNotFound); + } + + // There is no way to predict what contract will execute without an error + // If a fee can pay for execution then the contract will be scheduled + let err = + Self::reserve_signature_with_last_id(last_ids_sigs, &tx.last_id, &tx.signature); + if let Err(BankError::LastIdNotFound) = err { + error_counters.reserve_last_id += 1; + } else if let Err(BankError::DuplicateSignature) = err { + error_counters.duplicate_signature += 1; + } + err?; let mut called_accounts: Vec = tx .account_keys .iter() .map(|key| accounts.get(key).cloned().unwrap_or_default()) .collect(); - // There is no way to predict what contract will execute without an error - // If a fee can pay for execution then the contract will be scheduled - let err = - Self::reserve_signature_with_last_id(last_ids_sigs, &tx.last_id, &tx.signature); - err?; called_accounts[0].tokens -= tx.fee; Ok(called_accounts) } @@ -405,10 +440,12 @@ impl Bank { .iter() .map(|tx| Self::lock_account(&mut account_locks, &tx.account_keys, &mut error_counters)) .collect(); - inc_new_counter_info!( - "bank-process_transactions-account_in_use", - error_counters.account_in_use - ); + if error_counters.account_in_use != 0 { + inc_new_counter_info!( + "bank-process_transactions-account_in_use", + error_counters.account_in_use + ); + } rv } @@ -425,14 +462,24 @@ impl Bank { &self, txs: &[Transaction], results: Vec>, + max_age: usize, error_counters: &mut ErrorCounters, ) -> Vec<(Result>)> { let accounts = self.accounts.read().unwrap(); + // this must be locked first! + let last_ids = self.last_ids.read().unwrap(); let mut last_sigs = self.last_ids_sigs.write().unwrap(); txs.iter() .zip(results.into_iter()) .map(|etx| match etx { - (tx, Ok(())) => self.load_account(tx, &accounts, &mut last_sigs, error_counters), + (tx, Ok(())) => self.load_account( + tx, + &accounts, + &mut last_sigs, + &last_ids, + max_age, + error_counters, + ), (_, Err(e)) => Err(e), }).collect() } @@ -657,7 +704,11 @@ impl Bank { let locked_accounts = self.lock_accounts(txs); let lock_time = now.elapsed(); let now = Instant::now(); - let results = self.execute_and_commit_transactions(txs, locked_accounts); + // Use a shorter maximum age when adding transactions into the pipeline. This will reduce + // the likelyhood of any single thread getting starved and processing old ids. + // TODO: Banking stage threads should be prioratized to complete faster then this queue + // expires. + let results = self.execute_and_commit_transactions(txs, locked_accounts, MAX_ENTRY_IDS / 2); let process_time = now.elapsed(); let now = Instant::now(); self.record_transactions(txs, &results, poh)?; @@ -712,11 +763,13 @@ impl Bank { &self, txs: &[Transaction], locked_accounts: Vec>, + max_age: usize, ) -> Vec> { debug!("processing transactions: {}", txs.len()); let mut error_counters = ErrorCounters::default(); let now = Instant::now(); - let mut loaded_accounts = self.load_accounts(txs, locked_accounts, &mut error_counters); + let mut loaded_accounts = + self.load_accounts(txs, locked_accounts, max_age, &mut error_counters); let load_elapsed = now.elapsed(); let now = Instant::now(); let executed: Vec> = loaded_accounts @@ -741,44 +794,59 @@ impl Bank { self.update_transaction_statuses(txs, &executed); let mut tx_count = 0; let mut err_count = 0; - for r in &executed { + for (r, tx) in executed.iter().zip(txs.iter()) { if r.is_ok() { tx_count += 1; } else { if err_count == 0 { - debug!("tx error: {:?}", r); + info!("tx error: {:?} {:?}", r, tx); } err_count += 1; } } if err_count > 0 { info!("{} errors of {} txs", err_count, err_count + tx_count); - if !self.is_leader { - inc_new_counter_info!("bank-process_transactions_err-validator", err_count); - inc_new_counter_info!( - "bank-appy_debits-account_not_found-validator", - error_counters.account_not_found_validator - ); - } else { - inc_new_counter_info!("bank-process_transactions_err-leader", err_count); - inc_new_counter_info!( - "bank-appy_debits-account_not_found-leader", - error_counters.account_not_found_leader - ); - } + inc_new_counter_info!( + "bank-process_transactions-account_not_found", + error_counters.account_not_found + ); inc_new_counter_info!("bank-process_transactions-error_count", err_count); } self.transaction_count .fetch_add(tx_count, Ordering::Relaxed); inc_new_counter_info!("bank-process_transactions-txs", tx_count); + if 0 != error_counters.last_id_not_found { + inc_new_counter_info!( + "bank-process_transactions-error-last_id_not_found", + error_counters.last_id_not_found + ); + } + if 0 != error_counters.reserve_last_id { + inc_new_counter_info!( + "bank-process_transactions-error-reserve_last_id", + error_counters.reserve_last_id + ); + } + if 0 != error_counters.duplicate_signature { + inc_new_counter_info!( + "bank-process_transactions-error-duplicate_signature", + error_counters.duplicate_signature + ); + } + if 0 != error_counters.insufficient_funds { + inc_new_counter_info!( + "bank-process_transactions-error-insufficient_funds", + error_counters.insufficient_funds + ); + } executed } #[must_use] pub fn process_transactions(&self, txs: &[Transaction]) -> Vec> { let locked_accounts = self.lock_accounts(txs); - let results = self.execute_and_commit_transactions(txs, locked_accounts); + let results = self.execute_and_commit_transactions(txs, locked_accounts, MAX_ENTRY_IDS); self.unlock_accounts(txs, &results); results } @@ -804,8 +872,9 @@ impl Bank { for result in self.process_transactions(&entry.transactions) { result?; } + } else { + self.register_entry_id(&entry.id); } - self.register_entry_id(&entry.id); Ok(()) } @@ -1235,28 +1304,11 @@ mod tests { let key1 = Keypair::new().pubkey(); let key2 = Keypair::new().pubkey(); let bank = Bank::new(&mint); - - let spend = SystemProgram::Move { tokens: 1 }; - let instructions = vec![ - Instruction { - program_ids_index: 0, - userdata: serialize(&spend).unwrap(), - accounts: vec![0, 1], - }, - Instruction { - program_ids_index: 0, - userdata: serialize(&spend).unwrap(), - accounts: vec![0, 2], - }, - ]; - - let t1 = Transaction::new_with_instructions( + let t1 = Transaction::system_move_many( &mint.keypair(), - &[key1, key2], + &[(key1, 1), (key2, 1)], mint.last_id(), 0, - vec![SystemProgram::id()], - instructions, ); let res = bank.process_transactions(&vec![t1.clone()]); assert_eq!(res.len(), 1); @@ -1488,39 +1540,55 @@ mod tests { mint: &Mint, keypairs: &[Keypair], ) -> impl Iterator { + let mut last_id = mint.last_id(); let mut hash = mint.last_id(); let mut entries: Vec = vec![]; + let mut num_hashes = 0; for k in keypairs { let txs = vec![Transaction::system_new( &mint.keypair(), k.pubkey(), 1, - hash, + last_id, )]; let mut e = ledger::next_entries(&hash, 0, txs); entries.append(&mut e); hash = entries.last().unwrap().id; + let tick = Entry::new_mut(&mut hash, &mut num_hashes, vec![]); + last_id = hash; + entries.push(tick); } entries.into_iter() } - fn create_sample_block(mint: &Mint, length: usize) -> impl Iterator { + // create a ledger with tick entries every `ticks` entries + fn create_sample_block_with_ticks( + mint: &Mint, + length: usize, + ticks: usize, + ) -> impl Iterator { let mut entries = Vec::with_capacity(length); let mut hash = mint.last_id(); + let mut last_id = mint.last_id(); let mut num_hashes = 0; - for _ in 0..length { + for i in 0..length { let keypair = Keypair::new(); - let tx = Transaction::system_new(&mint.keypair(), keypair.pubkey(), 1, hash); + let tx = Transaction::system_new(&mint.keypair(), keypair.pubkey(), 1, last_id); let entry = Entry::new_mut(&mut hash, &mut num_hashes, vec![tx]); entries.push(entry); + if (i + 1) % ticks == 0 { + let tick = Entry::new_mut(&mut hash, &mut num_hashes, vec![]); + last_id = hash; + entries.push(tick); + } } entries.into_iter() } fn create_sample_ledger(length: usize) -> (impl Iterator, Pubkey) { - let mint = Mint::new(1 + length as i64); + let mint = Mint::new(length as i64 + 1); let genesis = mint.create_entries(); - let block = create_sample_block(&mint, length); + let block = create_sample_block_with_ticks(&mint, length, length); (genesis.into_iter().chain(block), mint.pubkey()) } @@ -1534,7 +1602,7 @@ mod tests { } #[test] - fn test_process_ledger() { + fn test_process_ledger_simple() { let (ledger, pubkey) = create_sample_ledger(1); let (ledger, dup) = ledger.tee(); let bank = Bank::default(); @@ -1542,10 +1610,13 @@ mod tests { .process_ledger(ledger, &mut LeaderScheduler::default()) .unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); - assert_eq!(ledger_height, 3); - assert_eq!(tail.len(), 3); + assert_eq!(ledger_height, 4); + assert_eq!(tail.len(), 4); assert_eq!(tail, dup.collect_vec()); let last_entry = &tail[tail.len() - 1]; + // last entry is a tick + assert_eq!(0, last_entry.transactions.len()); + // tick is registered assert_eq!(bank.last_id(), last_entry.id); } @@ -1566,7 +1637,7 @@ mod tests { .process_ledger(ledger, &mut LeaderScheduler::default()) .unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); - assert_eq!(ledger_height, entry_count as u64 + 2); + assert_eq!(ledger_height, entry_count as u64 + 3); assert!(tail.len() <= window_size); let last_entry = &tail[tail.len() - 1]; assert_eq!(bank.last_id(), last_entry.id); @@ -1598,7 +1669,7 @@ mod tests { fn test_process_ledger_from_files() { let mint = Mint::new(2); let genesis = to_file_iter(mint.create_entries().into_iter()); - let block = to_file_iter(create_sample_block(&mint, 1)); + let block = to_file_iter(create_sample_block_with_ticks(&mint, 1, 1)); let bank = Bank::default(); bank.process_ledger(genesis.chain(block), &mut LeaderScheduler::default()) @@ -1665,7 +1736,8 @@ mod tests { let pay_alice = vec![tx1]; let locked_alice = bank.lock_accounts(&pay_alice); - let results_alice = bank.execute_and_commit_transactions(&pay_alice, locked_alice); + let results_alice = + bank.execute_and_commit_transactions(&pay_alice, locked_alice, MAX_ENTRY_IDS); assert_eq!(results_alice[0], Ok(())); // try executing an interleaved transfer twice @@ -1777,4 +1849,33 @@ mod tests { .contains_key(&signature) ); } + #[test] + fn test_entry_id_age() { + let mut q = VecDeque::new(); + let hash1 = Hash::default(); + let hash2 = hash(hash1.as_ref()); + let hash3 = hash(hash2.as_ref()); + assert_eq!(Bank::compute_entry_id_age(&q, hash1), None); + q.push_back(hash1); + assert_eq!(Bank::compute_entry_id_age(&q, hash1), Some(0)); + q.push_back(hash2); + assert_eq!(Bank::compute_entry_id_age(&q, hash1), Some(1)); + assert_eq!(Bank::compute_entry_id_age(&q, hash2), Some(0)); + assert_eq!(Bank::compute_entry_id_age(&q, hash3), None); + + // all are below 2 + assert_eq!(Bank::check_entry_id_age(&q, hash2, 2), true); + assert_eq!(Bank::check_entry_id_age(&q, hash1, 2), true); + + // hash2 is most recent with age 0, max is 1, anything equal to max or above is rejected + assert_eq!(Bank::check_entry_id_age(&q, hash2, 1), true); + assert_eq!(Bank::check_entry_id_age(&q, hash1, 1), false); + + // max_age 0 is always rejected + assert_eq!(Bank::check_entry_id_age(&q, hash1, 0), false); + assert_eq!(Bank::check_entry_id_age(&q, hash2, 0), false); + + // hash3 is not in the q + assert_eq!(Bank::check_entry_id_age(&q, hash3, 3), false); + } } diff --git a/src/banking_stage.rs b/src/banking_stage.rs index eb8ad66135..b3e30ed6f4 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -2,7 +2,7 @@ //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. -use bank::Bank; +use bank::{Bank, NUM_TICKS_PER_SECOND}; use bincode::deserialize; use counter::Counter; use entry::Entry; @@ -44,7 +44,7 @@ pub enum Config { impl Default for Config { fn default() -> Config { // TODO: Change this to Tick to enable PoH - Config::Sleep(Duration::from_millis(500)) + Config::Sleep(Duration::from_millis(1000 / NUM_TICKS_PER_SECOND as u64)) } } impl BankingStage { diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index 5a65e559ef..c2bf7009d1 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -2,13 +2,17 @@ extern crate bincode; #[macro_use] extern crate clap; extern crate influx_db_client; +extern crate rand; extern crate rayon; +#[macro_use] +extern crate log; extern crate serde_json; #[macro_use] extern crate solana; use clap::{App, Arg}; use influx_db_client as influxdb; +use rand::{thread_rng, Rng}; use rayon::prelude::*; use solana::client::mk_client; use solana::cluster_info::{ClusterInfo, NodeInfo}; @@ -21,6 +25,7 @@ use solana::service::Service; use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil}; use solana::system_transaction::SystemTransaction; use solana::thin_client::{poll_gossip_for_leader, ThinClient}; +use solana::timing::timestamp; use solana::timing::{duration_as_ms, duration_as_s}; use solana::transaction::Transaction; use solana::wallet::request_airdrop; @@ -173,27 +178,34 @@ fn send_barrier_transaction(barrier_client: &mut ThinClient, last_id: &mut Hash, } } +type SharedTransactions = Arc>>>; fn generate_txs( - shared_txs: &Arc>>>, + shared_txs: &SharedTransactions, source: &[Keypair], dest: &[Keypair], - last_id: &Hash, threads: usize, reclaim: bool, + leader: &NodeInfo, ) { + let mut client = mk_client(leader); + let last_id = client.get_last_id(); + info!("last_id: {} {:?}", last_id, Instant::now()); let tx_count = source.len(); println!("Signing transactions... {} (reclaim={})", tx_count, reclaim); let signing_start = Instant::now(); - let pairs: Vec<_> = source.iter().zip(dest.iter()).collect(); + let pairs: Vec<_> = if !reclaim { + source.iter().zip(dest.iter()).collect() + } else { + dest.iter().zip(source.iter()).collect() + }; let transactions: Vec<_> = pairs .par_iter() .map(|(id, keypair)| { - if !reclaim { - Transaction::system_new(id, keypair.pubkey(), 1, *last_id) - } else { - Transaction::system_new(keypair, id.pubkey(), 1, *last_id) - } + ( + Transaction::system_new(id, keypair.pubkey(), 1, last_id), + timestamp(), + ) }).collect(); let duration = signing_start.elapsed(); @@ -201,10 +213,11 @@ fn generate_txs( let bsps = (tx_count) as f64 / ns as f64; let nsps = ns as f64 / (tx_count) as f64; println!( - "Done. {:.2} thousand signatures per second, {:.2} us per signature, {} ms total time", + "Done. {:.2} thousand signatures per second, {:.2} us per signature, {} ms total time, {}", bsps * 1_000_000_f64, nsps / 1_000_f64, duration_as_ms(&duration), + last_id, ); metrics::submit( influxdb::Point::new("bench-tps") @@ -227,7 +240,7 @@ fn generate_txs( fn do_tx_transfers( exit_signal: &Arc, - shared_txs: &Arc>>>, + shared_txs: &SharedTransactions, leader: &NodeInfo, shared_tx_thread_count: &Arc, total_tx_sent_count: &Arc, @@ -249,7 +262,11 @@ fn do_tx_transfers( let tx_len = txs0.len(); let transfer_start = Instant::now(); for tx in txs0 { - client.transfer_signed(&tx).unwrap(); + let now = timestamp(); + if now > tx.1 && now - tx.1 > 1000 * 30 { + continue; + } + client.transfer_signed(&tx.0).unwrap(); } shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed); total_tx_sent_count.fetch_add(tx_len, Ordering::Relaxed); @@ -274,28 +291,45 @@ fn do_tx_transfers( } } -fn split_tokens(tokens: i64, per_unit: i64, max_units: usize) -> (usize, i64) { - let total_blocks = tokens / per_unit; - let max_keys_to_fund = cmp::min(total_blocks - 1, max_units as i64); - let blocks_per_unit = total_blocks / (max_keys_to_fund + 1); - (max_keys_to_fund as usize, blocks_per_unit * per_unit) +const MAX_SPENDS_PER_TX: usize = 5; +fn verify_transfer(client: &mut ThinClient, tx: &Transaction) -> bool { + if client.poll_for_signature(&tx.signature).is_err() { + println!("no signature"); + return false; + } + for a in &tx.account_keys[1..] { + if client.poll_get_balance(a).unwrap_or(0) == 0 { + println!( + "no balance {} source bal: {} {:?}", + a, + client.poll_get_balance(&tx.account_keys[0]).unwrap_or(0), + tx + ); + return false; + } + } + true } - +/// fund the dests keys by spending all of the source keys into MAX_SPENDS_PER_TX +/// on every iteration. This allows us to replay the transfers because the source is either empty, +/// or full fn fund_keys(client: &mut ThinClient, source: &Keypair, dests: &[Keypair], tokens: i64) { - let max_per_move = 5; - let total = tokens * (dests.len() as i64 + 1); + let total = tokens * dests.len() as i64; let mut funded: Vec<(&Keypair, i64)> = vec![(source, total)]; let mut notfunded: Vec<&Keypair> = dests.iter().collect(); + println!("funding keys {}", dests.len()); while !notfunded.is_empty() { - let last_id = client.get_last_id(); let mut new_funded: Vec<(&Keypair, i64)> = vec![]; let mut to_fund = vec![]; println!("creating from... {}", funded.len()); for f in &mut funded { - let max_units = cmp::min(notfunded.len(), max_per_move); - let (num, per_unit) = split_tokens(f.1, tokens, max_units); - let start = notfunded.len() - num; + let max_units = cmp::min(notfunded.len(), MAX_SPENDS_PER_TX); + if max_units == 0 { + break; + } + let start = notfunded.len() - max_units; + let per_unit = f.1 / (max_units as i64); let moves: Vec<_> = notfunded[start..] .iter() .map(|k| (k.pubkey(), per_unit)) @@ -307,25 +341,38 @@ fn fund_keys(client: &mut ThinClient, source: &Keypair, dests: &[Keypair], token if !moves.is_empty() { to_fund.push((f.0, moves)); } - f.1 -= per_unit * (num as i64); - assert!(f.1 >= per_unit); } - println!("generating... {}", to_fund.len()); - let to_fund_txs: Vec<_> = to_fund - .par_iter() - .map(|(k, m)| Transaction::system_move_many(k, &m, last_id, 0)) - .collect(); - println!("transfering... {}", to_fund.len()); - to_fund_txs.iter().for_each(|tx| { - let _ = client.transfer_signed(&tx).expect("transfer"); + println!("sending... {}", to_fund.len()); + // try to transfer a few at a time with recent last_id + to_fund.chunks(10_000).for_each(|chunk| { + loop { + let last_id = client.get_last_id(); + println!("generating... {} {}", chunk.len(), last_id); + let mut to_fund_txs: Vec<_> = chunk + .par_iter() + .map(|(k, m)| Transaction::system_move_many(k, &m, last_id, 0)) + .collect(); + // with randomly distributed the failures + // most of the account pairs should have some funding in one of the pairs + // durring generate_tx step + thread_rng().shuffle(&mut to_fund_txs); + println!("transfering... {}", chunk.len()); + to_fund_txs.iter().for_each(|tx| { + let _ = client.transfer_signed(&tx).expect("transfer"); + }); + // randomly sample some of the transfers + thread_rng().shuffle(&mut to_fund_txs); + let max = cmp::min(10, to_fund_txs.len()); + if to_fund_txs[..max] + .iter() + .all(|tx| verify_transfer(client, tx)) + { + break; + } + } }); - println!( - "funded {} total: {} left: {}", - new_funded.len(), - funded.len() + new_funded.len(), - notfunded.len() - ); - funded.append(&mut new_funded); + println!("funded: {} left: {}", new_funded.len(), notfunded.len()); + funded = new_funded; } } @@ -600,7 +647,13 @@ fn main() { let mut rnd = GenKeys::new(seed); println!("Creating {} keypairs...", tx_count * 2); - let keypairs = rnd.gen_n_keypairs(tx_count * 2); + let mut total_keys = 0; + let mut target = tx_count * 2; + while target > 0 { + total_keys += target; + target /= MAX_SPENDS_PER_TX; + } + let gen_keypairs = rnd.gen_n_keypairs(total_keys as i64); let barrier_id = rnd.gen_n_keypairs(1).pop().unwrap(); println!("Get tokens..."); @@ -608,13 +661,19 @@ fn main() { // Sample the first keypair, see if it has tokens, if so then resume // to avoid token loss - let keypair0_balance = client.poll_get_balance(&keypairs[0].pubkey()).unwrap_or(0); + let keypair0_balance = client + .poll_get_balance(&gen_keypairs.last().unwrap().pubkey()) + .unwrap_or(0); if num_tokens_per_account > keypair0_balance { - let extra = (num_tokens_per_account - keypair0_balance) * (keypairs.len() as i64); - airdrop_tokens(&mut client, &leader, &id, extra); - fund_keys(&mut client, &id, &keypairs, num_tokens_per_account); + let extra = num_tokens_per_account - keypair0_balance; + let total = extra * (gen_keypairs.len() as i64); + airdrop_tokens(&mut client, &leader, &id, total); + println!("adding more tokens {}", extra); + fund_keys(&mut client, &id, &gen_keypairs, extra); } + let start = gen_keypairs.len() - (tx_count * 2) as usize; + let keypairs = &gen_keypairs[start..]; airdrop_tokens(&mut barrier_client, &leader, &barrier_id, 1); println!("Get last ID..."); @@ -641,8 +700,7 @@ fn main() { }).unwrap() }).collect(); - let shared_txs: Arc>>> = - Arc::new(RwLock::new(VecDeque::new())); + let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new())); let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0)); let total_tx_sent_count = Arc::new(AtomicUsize::new(0)); @@ -683,9 +741,9 @@ fn main() { &shared_txs, &keypairs[..len], &keypairs[len..], - &last_id, threads, reclaim_tokens_back_to_source_account, + &leader, ); // In sustained mode overlap the transfers with generation // this has higher average performance but lower peak performance @@ -788,16 +846,6 @@ fn converge( #[cfg(test)] mod tests { use super::*; - #[test] - fn test_split_tokens() { - assert_eq!(split_tokens(3, 2, 5), (0, 2)); - assert_eq!(split_tokens(4, 2, 5), (1, 2)); - assert_eq!(split_tokens(5, 2, 5), (1, 2)); - assert_eq!(split_tokens(6, 2, 5), (2, 2)); - assert_eq!(split_tokens(20, 2, 5), (5, 2)); - assert_eq!(split_tokens(30, 2, 5), (5, 4)); - } - #[test] fn test_switch_directions() { assert_eq!(should_switch_directions(20, 0), false); diff --git a/src/budget_transaction.rs b/src/budget_transaction.rs index ca5de244cb..0e3e3b66f1 100644 --- a/src/budget_transaction.rs +++ b/src/budget_transaction.rs @@ -205,7 +205,6 @@ impl BudgetTransaction for Transaction { fn vote(&self) -> Option<(Pubkey, Vote, Hash)> { if self.instructions.len() > 1 { - error!("expecting only 1 Instruction per vote"); None } else if let Some(Instruction::NewVote(vote)) = self.instruction(0) { Some((self.account_keys[0], vote, self.last_id)) diff --git a/src/entry_writer.rs b/src/entry_writer.rs index d4b615185d..851da94c2b 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -2,24 +2,15 @@ //! writes entries to the given writer, which is typically a file or //! stdout, and then sends the Entry to its output channel. -use bank::Bank; use bincode; use entry::Entry; use std::io::{self, BufRead, Error, ErrorKind, Write}; use std::mem::size_of; -pub struct EntryWriter<'a, W> { - bank: &'a Bank, - writer: W, -} +pub struct EntryWriter {} -impl<'a, W: Write> EntryWriter<'a, W> { - /// Create a new Tpu that wraps the given Bank. - pub fn new(bank: &'a Bank, writer: W) -> Self { - EntryWriter { bank, writer } - } - - fn write_entry(writer: &mut W, entry: &Entry) -> io::Result<()> { +impl EntryWriter { + fn write_entry(writer: &mut W, entry: &Entry) -> io::Result<()> { let entry_bytes = bincode::serialize(&entry).map_err(|e| Error::new(ErrorKind::Other, e.to_string()))?; @@ -32,7 +23,7 @@ impl<'a, W: Write> EntryWriter<'a, W> { writer.flush() } - pub fn write_entries(writer: &mut W, entries: I) -> io::Result<()> + pub fn write_entries(writer: &mut W, entries: I) -> io::Result<()> where I: IntoIterator, { @@ -41,20 +32,6 @@ impl<'a, W: Write> EntryWriter<'a, W> { } Ok(()) } - - fn write_and_register_entry(&mut self, entry: &Entry) -> io::Result<()> { - trace!("write_and_register_entry entry"); - self.bank.register_entry_id(&entry.id); - - Self::write_entry(&mut self.writer, entry) - } - - pub fn write_and_register_entries(&mut self, entries: &[Entry]) -> io::Result<()> { - for entry in entries { - self.write_and_register_entry(&entry)?; - } - Ok(()) - } } struct EntryReader { diff --git a/src/poh_recorder.rs b/src/poh_recorder.rs index 9f84f582cd..6f444c42f8 100644 --- a/src/poh_recorder.rs +++ b/src/poh_recorder.rs @@ -53,7 +53,7 @@ impl PohRecorder { // This guarantees PoH order and Entry production and banks LastId queue is the same. let mut poh = self.poh.lock().unwrap(); let tick = poh.record(mixin); - self.bank.register_entry_id(&tick.id); + assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function"); let entry = Entry { num_hashes: tick.num_hashes, id: tick.id, @@ -71,6 +71,7 @@ mod tests { use mint::Mint; use std::sync::mpsc::channel; use std::sync::Arc; + use system_transaction::test_tx; #[test] fn test_poh() { @@ -81,7 +82,8 @@ mod tests { //send some data let h1 = hash(b"hello world!"); - assert!(poh_recorder.record(h1, vec![]).is_ok()); + let tx = test_tx(); + assert!(poh_recorder.record(h1, vec![tx]).is_ok()); assert!(poh_recorder.tick().is_ok()); //get some events diff --git a/src/system_transaction.rs b/src/system_transaction.rs index 2e71966424..ead09195a1 100644 --- a/src/system_transaction.rs +++ b/src/system_transaction.rs @@ -135,7 +135,7 @@ impl SystemTransaction for Transaction { Instruction { program_ids_index: 0, userdata: serialize(&spend).unwrap(), - accounts: vec![0, i as u8], + accounts: vec![0, i as u8 + 1], } }).collect(); let to_keys: Vec<_> = moves.iter().map(|(to_key, _)| *to_key).collect(); @@ -215,4 +215,19 @@ mod tests { let sign_data0b = tx0.get_sign_data(); assert_ne!(sign_data0a, sign_data0b); } + #[test] + fn test_move_many() { + let from = Keypair::new(); + let t1 = Keypair::new(); + let t2 = Keypair::new(); + let moves = vec![(t1.pubkey(), 1), (t2.pubkey(), 2)]; + + let tx = Transaction::system_move_many(&from, &moves, Default::default(), 0); + assert_eq!(tx.account_keys[0], from.pubkey()); + assert_eq!(tx.account_keys[1], t1.pubkey()); + assert_eq!(tx.account_keys[2], t2.pubkey()); + assert_eq!(tx.instructions.len(), 2); + assert_eq!(tx.instructions[0].accounts, vec![0, 1]); + assert_eq!(tx.instructions[1].accounts, vec![0, 2]); + } } diff --git a/tests/multinode.rs b/tests/multinode.rs index e2cf15530e..8ba93159b1 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -18,8 +18,10 @@ use solana::ncp::Ncp; use solana::result; use solana::service::Service; use solana::signature::{Keypair, KeypairUtil}; +use solana::system_transaction::SystemTransaction; use solana::thin_client::ThinClient; use solana::timing::{duration_as_ms, duration_as_s}; +use solana::transaction::Transaction; use solana::window::{default_window, WINDOW_SIZE}; use solana_program_interface::pubkey::Pubkey; use std::collections::{HashSet, VecDeque}; @@ -1469,10 +1471,9 @@ fn send_tx_and_retry_get_balance( let mut client = mk_client(leader); trace!("getting leader last_id"); let last_id = client.get_last_id(); + let tx = Transaction::system_new(&alice.keypair(), *bob_pubkey, transfer_amount, last_id); info!("executing leader transfer"); - let _sig = client - .transfer(transfer_amount, &alice.keypair(), *bob_pubkey, &last_id) - .unwrap(); + let _res = client.retry_transfer_signed(&tx, 30); retry_get_balance(&mut client, bob_pubkey, expected) }