From 5fb8baed045d6fe93cbc70a9c6a0df215bbc4c81 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 19 Apr 2019 07:29:07 -0600 Subject: [PATCH] Process async BankClient transactions in batches (#3738) * Process async transactions in batches This aims to process transactions at least as fast as LocalCluster * Add benchmark --- runtime/benches/bank.rs | 54 ++++++++++++++++++++++++-------------- runtime/src/bank_client.rs | 40 +++++++++++++++++++++++----- 2 files changed, 67 insertions(+), 27 deletions(-) diff --git a/runtime/benches/bank.rs b/runtime/benches/bank.rs index a8e705ff58..5a88cf0d3c 100644 --- a/runtime/benches/bank.rs +++ b/runtime/benches/bank.rs @@ -3,20 +3,19 @@ extern crate test; use solana_runtime::bank::*; +use solana_runtime::bank_client::BankClient; +use solana_sdk::client::AsyncClient; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES}; +use solana_sdk::transaction::Transaction; use test::Bencher; -#[bench] -fn bench_process_transaction(bencher: &mut Bencher) { - let (genesis_block, mint_keypair) = GenesisBlock::new(100_000_000); - let bank = Bank::new(&genesis_block); - - // Create transactions between unrelated parties. - let transactions: Vec<_> = (0..4096) +// Create transactions between unrelated parties. +pub fn create_sample_transactions(bank: &Bank, mint_keypair: &Keypair) -> Vec { + (0..4096) .into_iter() .map(|_| { // Seed the 'from' account. @@ -32,22 +31,22 @@ fn bench_process_transaction(bencher: &mut Bencher) { // Seed the 'to' account and a cell for its signature. let rando1 = Keypair::new(); - let tx = system_transaction::transfer( - &rando0, - &rando1.pubkey(), - 1, - bank.last_blockhash(), - 0, - ); - assert_eq!(bank.process_transaction(&tx), Ok(())); - - // Finally, return the transaction to the benchmark. - tx + system_transaction::transfer(&rando0, &rando1.pubkey(), 1, bank.last_blockhash(), 0) }) - .collect(); + .collect() +} + +#[bench] +fn bench_process_transaction(bencher: &mut Bencher) { + let (genesis_block, mint_keypair) = GenesisBlock::new(100_000_000); + let bank = Bank::new(&genesis_block); + let transactions = create_sample_transactions(&bank, &mint_keypair); + + // Run once to create all the 'to' accounts. + let results = bank.process_transactions(&transactions); + assert!(results.iter().all(Result::is_ok)); let mut id = bank.last_blockhash(); - for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) { bank.register_tick(&id); id = hash(&id.as_ref()) @@ -60,3 +59,18 @@ fn bench_process_transaction(bencher: &mut Bencher) { assert!(results.iter().all(Result::is_ok)); }) } + +#[bench] +fn bench_bank_client(bencher: &mut Bencher) { + let (genesis_block, mint_keypair) = GenesisBlock::new(100_000_000); + let bank = Bank::new(&genesis_block); + let transactions = create_sample_transactions(&bank, &mint_keypair); + + bencher.iter(|| { + let bank = Bank::new(&genesis_block); + let bank_client = BankClient::new(bank); + for transaction in transactions.clone() { + bank_client.async_send_transaction(transaction).unwrap(); + } + }) +} diff --git a/runtime/src/bank_client.rs b/runtime/src/bank_client.rs index d75e1edaf7..78f4a34bce 100644 --- a/runtime/src/bank_client.rs +++ b/runtime/src/bank_client.rs @@ -1,5 +1,5 @@ use crate::bank::Bank; -use solana_sdk::client::{AsyncClient, SyncClient}; +use solana_sdk::client::{AsyncClient, Client, SyncClient}; use solana_sdk::hash::Hash; use solana_sdk::instruction::Instruction; use solana_sdk::message::Message; @@ -10,17 +10,22 @@ use solana_sdk::system_instruction; use solana_sdk::transaction::{self, Transaction}; use solana_sdk::transport::Result; use std::io; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::Arc; +use std::thread::Builder; pub struct BankClient { - bank: Bank, + bank: Arc, + transaction_sender: Sender, } +impl Client for BankClient {} + impl AsyncClient for BankClient { fn async_send_transaction(&self, transaction: Transaction) -> io::Result { - // Ignore the result. Client must use get_signature_status() instead. - let _ = self.bank.process_transaction(&transaction); - - Ok(transaction.signatures.get(0).cloned().unwrap_or_default()) + let signature = transaction.signatures.get(0).cloned().unwrap_or_default(); + self.transaction_sender.send(transaction).unwrap(); + Ok(signature) } fn async_send_message( @@ -104,8 +109,29 @@ impl SyncClient for BankClient { } impl BankClient { + fn run(bank: &Bank, transaction_receiver: Receiver) { + while let Ok(tx) = transaction_receiver.recv() { + let mut transactions = vec![tx]; + while let Ok(tx) = transaction_receiver.try_recv() { + transactions.push(tx); + } + let _ = bank.process_transactions(&transactions); + } + } + pub fn new(bank: Bank) -> Self { - Self { bank } + let bank = Arc::new(bank); + let (transaction_sender, transaction_receiver) = channel(); + let thread_bank = bank.clone(); + let bank = bank.clone(); + Builder::new() + .name("solana-bank-client".to_string()) + .spawn(move || Self::run(&thread_bank, transaction_receiver)) + .unwrap(); + Self { + bank, + transaction_sender, + } } }