Enable mt-bank (#1368)

* Enable mt-bank

* cleanup and interleaving lock tests
This commit is contained in:
anatoly yakovenko 2018-10-04 13:15:54 -07:00 committed by GitHub
parent d901767b54
commit 1a68807ad9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 290 additions and 94 deletions

View File

@ -5,7 +5,6 @@ extern crate solana;
extern crate test;
use bincode::serialize;
use rayon::prelude::*;
use solana::bank::*;
use solana::hash::hash;
use solana::mint::Mint;
@ -21,7 +20,7 @@ fn bench_process_transaction(bencher: &mut Bencher) {
// Create transactions between unrelated parties.
let transactions: Vec<_> = (0..4096)
.into_par_iter()
.into_iter()
.map(|i| {
// Seed the 'from' account.
let rando0 = Keypair::new();
@ -32,7 +31,7 @@ fn bench_process_transaction(bencher: &mut Bencher) {
mint.last_id(),
0,
);
assert!(bank.process_transaction(&tx).is_ok());
assert_eq!(bank.process_transaction(&tx), Ok(()));
// Seed the 'to' account and a cell for its signature.
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
@ -40,7 +39,7 @@ fn bench_process_transaction(bencher: &mut Bencher) {
let rando1 = Keypair::new();
let tx = Transaction::system_move(&rando0, rando1.pubkey(), 1, last_id, 0);
assert!(bank.process_transaction(&tx).is_ok());
assert_eq!(bank.process_transaction(&tx), Ok(()));
// Finally, return the transaction to the benchmark.
tx

View File

@ -111,8 +111,6 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
#[bench]
fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
//use solana::logger;
//logger::setup();
let progs = 5;
let txes = 1000 * NUM_THREADS;
let mint_total = 1_000_000_000_000;

View File

@ -15,14 +15,16 @@ use ledger::Block;
use log::Level;
use mint::Mint;
use payment_plan::Payment;
use signature::{Keypair, Signature};
use poh_recorder::PohRecorder;
use signature::Keypair;
use signature::Signature;
use solana_program_interface::account::{Account, KeyedAccount};
use solana_program_interface::pubkey::Pubkey;
use std;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock;
use std::sync::{Mutex, RwLock};
use std::time::Instant;
use storage_program::StorageProgram;
use system_program::SystemProgram;
@ -39,13 +41,16 @@ use window::WINDOW_SIZE;
/// but requires clients to update its `last_id` more frequently. Raising the value
/// lengthens the time a client must wait to be certain a missing transaction will
/// not be processed by the network.
pub const MAX_ENTRY_IDS: usize = 1024 * 16;
pub const MAX_ENTRY_IDS: usize = 1024 * 32;
pub const VERIFY_BLOCK_SIZE: usize = 16;
/// Reasons a transaction might be rejected.
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BankError {
/// This Pubkey is being processed in another transaction
AccountInUse,
/// Attempt to debit from `Pubkey`, but no found no record of a prior credit.
AccountNotFound,
@ -85,6 +90,9 @@ pub enum BankError {
/// The program returned an error
ProgramRuntimeError(u8),
/// Recoding into PoH failed
RecordFailure,
}
pub type Result<T> = result::Result<T, BankError>;
@ -94,13 +102,16 @@ type SignatureStatusMap = HashMap<Signature, Result<()>>;
struct ErrorCounters {
account_not_found_validator: usize,
account_not_found_leader: usize,
account_in_use: usize,
}
/// The state of all accounts and contracts after processing its entries.
pub struct Bank {
/// A map of account public keys to the balance in that account.
accounts: RwLock<HashMap<Pubkey, Account>>,
/// set of accounts which are currently in the pipeline
account_locks: Mutex<HashSet<Pubkey>>,
/// A FIFO queue of `last_id` items, where each item is a set of signatures
/// that have been processed using that `last_id`. Rejected `last_id`
/// values are so old that the `last_id` has been pulled out of the queue.
@ -129,6 +140,7 @@ impl Default for Bank {
fn default() -> Self {
Bank {
accounts: RwLock::new(HashMap::new()),
account_locks: Mutex::new(HashSet::new()),
last_ids: RwLock::new(VecDeque::new()),
last_ids_sigs: RwLock::new(HashMap::new()),
transaction_count: AtomicUsize::new(0),
@ -200,18 +212,23 @@ impl Bank {
}
}
fn reserve_signature_with_last_id(&self, signature: &Signature, last_id: &Hash) -> Result<()> {
if let Some(entry) = self
.last_ids_sigs
.write()
.expect("'last_ids' read lock in reserve_signature_with_last_id")
.get_mut(last_id)
{
return Self::reserve_signature(&mut entry.0, signature);
fn reserve_signature_with_last_id(
last_ids_sigs: &mut HashMap<Hash, (SignatureStatusMap, u64)>,
last_id: &Hash,
sig: &Signature,
) -> Result<()> {
if let Some(entry) = last_ids_sigs.get_mut(last_id) {
return Self::reserve_signature(&mut entry.0, sig);
}
Err(BankError::LastIdNotFound)
}
#[cfg(test)]
fn reserve_signature_with_last_id_test(&self, sig: &Signature, last_id: &Hash) -> Result<()> {
let mut last_ids_sigs = self.last_ids_sigs.write().unwrap();
Self::reserve_signature_with_last_id(&mut last_ids_sigs, last_id, sig)
}
fn update_signature_status(
signatures: &mut SignatureStatusMap,
signature: &Signature,
@ -222,19 +239,25 @@ impl Bank {
}
fn update_signature_status_with_last_id(
&self,
last_ids_sigs: &mut HashMap<Hash, (SignatureStatusMap, u64)>,
signature: &Signature,
result: &Result<()>,
last_id: &Hash,
) {
if let Some(entry) = self.last_ids_sigs.write().unwrap().get_mut(last_id) {
if let Some(entry) = last_ids_sigs.get_mut(last_id) {
Self::update_signature_status(&mut entry.0, signature, result);
}
}
fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) {
let mut last_ids = self.last_ids_sigs.write().unwrap();
for (i, tx) in txs.iter().enumerate() {
self.update_signature_status_with_last_id(&tx.signature, &res[i], &tx.last_id);
Self::update_signature_status_with_last_id(
&mut last_ids,
&tx.signature,
&res[i],
&tx.last_id,
);
}
}
@ -277,7 +300,8 @@ impl Bank {
/// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method.
pub fn process_transaction(&self, tx: &Transaction) -> Result<()> {
match self.process_transactions(&[tx.clone()])[0] {
let txs = vec![tx.clone()];
match self.process_transactions(&txs)[0] {
Err(ref e) => {
info!("process_transaction error: {:?}", e);
Err((*e).clone())
@ -285,11 +309,38 @@ impl Bank {
Ok(_) => Ok(()),
}
}
fn lock_account(
account_locks: &mut HashSet<Pubkey>,
keys: &[Pubkey],
error_counters: &mut ErrorCounters,
) -> Result<()> {
// Copy all the accounts
for k in keys {
if account_locks.contains(k) {
error_counters.account_in_use += 1;
return Err(BankError::AccountInUse);
}
}
for k in keys {
account_locks.insert(*k);
}
Ok(())
}
fn unlock_account(tx: &Transaction, result: &Result<()>, account_locks: &mut HashSet<Pubkey>) {
match result {
Err(BankError::AccountInUse) => (),
_ => for k in &tx.account_keys {
account_locks.remove(k);
},
}
}
fn load_account(
&self,
tx: &Transaction,
accounts: &HashMap<Pubkey, Account>,
last_ids_sigs: &mut HashMap<Hash, (SignatureStatusMap, u64)>,
error_counters: &mut ErrorCounters,
) -> Result<Vec<Account>> {
// Copy all the accounts
@ -310,21 +361,54 @@ impl Bank {
.collect();
// There is no way to predict what contract will execute without an error
// If a fee can pay for execution then the contract will be scheduled
self.reserve_signature_with_last_id(&tx.signature, &tx.last_id)?;
let err =
Self::reserve_signature_with_last_id(last_ids_sigs, &tx.last_id, &tx.signature);
err?;
called_accounts[0].tokens -= tx.fee;
Ok(called_accounts)
}
}
/// This function will prevent multiple threads from modifying the same account state at the
/// same time
#[must_use]
fn lock_accounts(&self, txs: &[Transaction]) -> Vec<Result<()>> {
let mut account_locks = self.account_locks.lock().unwrap();
let mut error_counters = ErrorCounters::default();
let rv = txs
.iter()
.map(|tx| Self::lock_account(&mut account_locks, &tx.account_keys, &mut error_counters))
.collect();
inc_new_counter_info!(
"bank-process_transactions-account_in_use",
error_counters.account_in_use
);
rv
}
/// Once accounts are unlocked, new transactions that modify that state can enter the pipeline
fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) {
debug!("bank unlock accounts");
let mut account_locks = self.account_locks.lock().unwrap();
txs.iter()
.zip(results.iter())
.for_each(|(tx, result)| Self::unlock_account(tx, result, &mut account_locks));
}
fn load_accounts(
&self,
txs: &[Transaction],
accounts: &HashMap<Pubkey, Account>,
results: Vec<Result<()>>,
error_counters: &mut ErrorCounters,
) -> Vec<Result<Vec<Account>>> {
) -> Vec<(Result<Vec<Account>>)> {
let accounts = self.accounts.read().unwrap();
let mut last_sigs = self.last_ids_sigs.write().unwrap();
txs.iter()
.map(|tx| self.load_account(tx, accounts, error_counters))
.collect()
.zip(results.into_iter())
.map(|etx| match etx {
(tx, Ok(())) => self.load_account(tx, &accounts, &mut last_sigs, error_counters),
(_, Err(e)) => Err(e),
}).collect()
}
pub fn verify_transaction(
@ -477,11 +561,12 @@ impl Bank {
}
pub fn store_accounts(
&self,
txs: &[Transaction],
res: &[Result<()>],
loaded: &[Result<Vec<Account>>],
accounts: &mut HashMap<Pubkey, Account>,
) {
let mut accounts = self.accounts.write().unwrap();
for (i, racc) in loaded.iter().enumerate() {
if res[i].is_err() || racc.is_err() {
continue;
@ -501,22 +586,80 @@ impl Bank {
}
}
pub fn process_and_record_transactions(
&self,
txs: &[Transaction],
poh: &PohRecorder,
) -> Result<()> {
let now = Instant::now();
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
let locked_accounts = self.lock_accounts(txs);
let lock_time = now.elapsed();
let now = Instant::now();
let results = self.execute_and_commit_transactions(txs, locked_accounts);
let process_time = now.elapsed();
let now = Instant::now();
self.record_transactions(txs, &results, poh)?;
let record_time = now.elapsed();
let now = Instant::now();
// Once the accounts are unlocked new transactions can enter the pipeline to process them
self.unlock_accounts(&txs, &results);
let unlock_time = now.elapsed();
debug!(
"lock: {}us process: {}us record: {}us unlock: {}us txs_len={}",
duration_as_us(&lock_time),
duration_as_us(&process_time),
duration_as_us(&record_time),
duration_as_us(&unlock_time),
txs.len(),
);
Ok(())
}
fn record_transactions(
&self,
txs: &[Transaction],
results: &[Result<()>],
poh: &PohRecorder,
) -> Result<()> {
let processed_transactions: Vec<_> = results
.iter()
.zip(txs.iter())
.filter_map(|(r, x)| match r {
Ok(_) => Some(x.clone()),
Err(ref e) => {
debug!("process transaction failed {:?}", e);
None
}
}).collect();
// unlock all the accounts with errors which are filtered by the above `filter_map`
if !processed_transactions.is_empty() {
let hash = Transaction::hash(&processed_transactions);
debug!("processed ok: {} {}", processed_transactions.len(), hash);
// record and unlock will unlock all the successfull transactions
poh.record(hash, processed_transactions).map_err(|e| {
warn!("record failure: {:?}", e);
BankError::RecordFailure
})?;
}
Ok(())
}
/// Process a batch of transactions.
#[must_use]
pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> {
pub fn execute_and_commit_transactions(
&self,
txs: &[Transaction],
locked_accounts: Vec<Result<()>>,
) -> Vec<Result<()>> {
debug!("processing transactions: {}", txs.len());
// TODO right now a single write lock is held for the duration of processing all the
// transactions
// To break this lock each account needs to be locked to prevent concurrent access
let mut accounts = self.accounts.write().unwrap();
let txs_len = txs.len();
let mut error_counters = ErrorCounters::default();
let now = Instant::now();
let mut loaded_accounts = self.load_accounts(&txs, &accounts, &mut error_counters);
let mut loaded_accounts = self.load_accounts(txs, locked_accounts, &mut error_counters);
let load_elapsed = now.elapsed();
let now = Instant::now();
let res: Vec<_> = loaded_accounts
let executed: Vec<Result<()>> = loaded_accounts
.iter_mut()
.zip(txs.iter())
.map(|(acc, tx)| match acc {
@ -525,19 +668,20 @@ impl Bank {
}).collect();
let execution_elapsed = now.elapsed();
let now = Instant::now();
Self::store_accounts(&txs, &res, &loaded_accounts, &mut accounts);
self.update_transaction_statuses(&txs, &res);
self.store_accounts(txs, &executed, &loaded_accounts);
// once committed there is no way to unroll
let write_elapsed = now.elapsed();
debug!(
"load: {}us execution: {}us write: {}us txs_len={}",
"load: {}us execute: {}us store: {}us txs_len={}",
duration_as_us(&load_elapsed),
duration_as_us(&execution_elapsed),
duration_as_us(&write_elapsed),
txs_len
txs.len(),
);
self.update_transaction_statuses(txs, &executed);
let mut tx_count = 0;
let mut err_count = 0;
for r in &res {
for r in &executed {
if r.is_ok() {
tx_count += 1;
} else {
@ -562,14 +706,21 @@ impl Bank {
error_counters.account_not_found_leader
);
}
inc_new_counter_info!("bank-process_transactions-error_count", err_count);
}
let cur_tx_count = self.transaction_count.load(Ordering::Relaxed);
if ((cur_tx_count + tx_count) & !(262_144 - 1)) > cur_tx_count & !(262_144 - 1) {
info!("accounts.len: {}", accounts.len());
}
self.transaction_count
.fetch_add(tx_count, Ordering::Relaxed);
res
inc_new_counter_info!("bank-process_transactions-txs", tx_count);
executed
}
#[must_use]
pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> {
let locked_accounts = self.lock_accounts(txs);
let results = self.execute_and_commit_transactions(txs, locked_accounts);
self.unlock_accounts(txs, &results);
results
}
pub fn process_entry(&self, entry: &Entry) -> Result<()> {
@ -789,9 +940,11 @@ mod tests {
use hash::hash;
use ledger;
use logger;
use signature::Keypair;
use signature::{GenKeys, KeypairUtil};
use std;
use std::io::{BufReader, Cursor, Seek, SeekFrom};
use system_transaction::SystemTransaction;
use transaction::Instruction;
#[test]
@ -818,13 +971,37 @@ mod tests {
assert_eq!(bank.transaction_count(), 2);
}
#[test]
fn test_one_source_two_tx_one_batch() {
let mint = Mint::new(1);
let key1 = Keypair::new().pubkey();
let key2 = Keypair::new().pubkey();
let bank = Bank::new(&mint);
assert_eq!(bank.last_id(), mint.last_id());
let t1 = Transaction::system_move(&mint.keypair(), key1, 1, mint.last_id(), 0);
let t2 = Transaction::system_move(&mint.keypair(), key2, 1, mint.last_id(), 0);
let res = bank.process_transactions(&vec![t1.clone(), t2.clone()]);
assert_eq!(res.len(), 2);
assert_eq!(res[0], Ok(()));
assert_eq!(res[1], Err(BankError::AccountInUse));
assert_eq!(bank.get_balance(&mint.pubkey()), 0);
assert_eq!(bank.get_balance(&key1), 1);
assert_eq!(bank.get_balance(&key2), 0);
assert_eq!(bank.get_signature(&t1.last_id, &t1.signature), Some(Ok(())));
// TODO: Transactions that fail to pay a fee could be dropped silently
assert_eq!(
bank.get_signature(&t2.last_id, &t2.signature),
Some(Err(BankError::AccountInUse))
);
}
#[test]
fn test_one_tx_two_out_atomic_fail() {
let mint = Mint::new(1);
let key1 = Keypair::new().pubkey();
let key2 = Keypair::new().pubkey();
let bank = Bank::new(&mint);
let spend = SystemProgram::Move { tokens: 1 };
let instructions = vec![
Instruction {
@ -889,7 +1066,7 @@ mod tests {
);
let res = bank.process_transactions(&vec![t1.clone()]);
assert_eq!(res.len(), 1);
assert!(res[0].is_ok());
assert_eq!(res[0], Ok(()));
assert_eq!(bank.get_balance(&mint.pubkey()), 0);
assert_eq!(bank.get_balance(&key1), 1);
assert_eq!(bank.get_balance(&key2), 1);
@ -931,7 +1108,7 @@ mod tests {
let res = bank.process_transaction(&tx);
// Result failed, but signature is registered
assert!(!res.is_ok());
assert!(res.is_err());
assert!(bank.has_signature(&signature));
assert_matches!(
bank.get_signature_status(&signature),
@ -992,12 +1169,12 @@ mod tests {
let mint = Mint::new(1);
let bank = Bank::new(&mint);
let signature = Signature::default();
assert!(
bank.reserve_signature_with_last_id(&signature, &mint.last_id())
.is_ok()
assert_eq!(
bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()),
Ok(())
);
assert_eq!(
bank.reserve_signature_with_last_id(&signature, &mint.last_id()),
bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()),
Err(BankError::DuplicateSignature)
);
}
@ -1007,12 +1184,12 @@ mod tests {
let mint = Mint::new(1);
let bank = Bank::new(&mint);
let signature = Signature::default();
bank.reserve_signature_with_last_id(&signature, &mint.last_id())
bank.reserve_signature_with_last_id_test(&signature, &mint.last_id())
.unwrap();
bank.clear_signatures();
assert!(
bank.reserve_signature_with_last_id(&signature, &mint.last_id())
.is_ok()
assert_eq!(
bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()),
Ok(())
);
}
@ -1021,9 +1198,9 @@ mod tests {
let mint = Mint::new(1);
let bank = Bank::new(&mint);
let signature = Signature::default();
bank.reserve_signature_with_last_id(&signature, &mint.last_id())
bank.reserve_signature_with_last_id_test(&signature, &mint.last_id())
.expect("reserve signature");
assert!(bank.get_signature_status(&signature).is_ok());
assert_eq!(bank.get_signature_status(&signature), Ok(()));
}
#[test]
@ -1031,7 +1208,7 @@ mod tests {
let mint = Mint::new(1);
let bank = Bank::new(&mint);
let signature = Signature::default();
bank.reserve_signature_with_last_id(&signature, &mint.last_id())
bank.reserve_signature_with_last_id_test(&signature, &mint.last_id())
.expect("reserve signature");
assert!(bank.has_signature(&signature));
}
@ -1047,7 +1224,7 @@ mod tests {
}
// Assert we're no longer able to use the oldest entry ID.
assert_eq!(
bank.reserve_signature_with_last_id(&signature, &mint.last_id()),
bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()),
Err(BankError::LastIdNotFound)
);
}
@ -1100,7 +1277,7 @@ mod tests {
// Now ensure the TX is accepted despite pointing to the ID of an empty entry.
bank.process_entries(&[entry]).unwrap();
assert!(bank.process_transaction(&tx).is_ok());
assert_eq!(bank.process_transaction(&tx), Ok(()));
}
#[test]
@ -1116,12 +1293,19 @@ mod tests {
mint: &Mint,
keypairs: &[Keypair],
) -> impl Iterator<Item = Entry> {
let hash = mint.last_id();
let transactions: Vec<_> = keypairs
.iter()
.map(|keypair| Transaction::system_new(&mint.keypair(), keypair.pubkey(), 1, hash))
.collect();
let entries = ledger::next_entries(&hash, 0, transactions);
let mut hash = mint.last_id();
let mut entries: Vec<Entry> = vec![];
for k in keypairs {
let txs = vec![Transaction::system_new(
&mint.keypair(),
k.pubkey(),
1,
hash,
)];
let mut e = ledger::next_entries(&hash, 0, txs);
entries.append(&mut e);
hash = entries.last().unwrap().id;
}
entries.into_iter()
}
@ -1265,4 +1449,37 @@ mod tests {
def_bank.set_finality(90);
assert_eq!(def_bank.finality(), 90);
}
#[test]
fn test_interleaving_locks() {
let mint = Mint::new(3);
let bank = Bank::new(&mint);
let alice = Keypair::new();
let bob = Keypair::new();
let tx1 = Transaction::system_new(&mint.keypair(), alice.pubkey(), 1, mint.last_id());
let pay_alice = vec![tx1];
let locked_alice = bank.lock_accounts(&pay_alice);
let results_alice = bank.execute_and_commit_transactions(&pay_alice, locked_alice);
assert_eq!(results_alice[0], Ok(()));
// try executing an interleaved transfer twice
assert_eq!(
bank.transfer(1, &mint.keypair(), bob.pubkey(), mint.last_id()),
Err(BankError::AccountInUse)
);
// the second time shoudl fail as well
// this verifies that `unlock_accounts` doesn't unlock `AccountInUse` accounts
assert_eq!(
bank.transfer(1, &mint.keypair(), bob.pubkey(), mint.last_id()),
Err(BankError::AccountInUse)
);
bank.unlock_accounts(&pay_alice, &results_alice);
assert_matches!(
bank.transfer(2, &mint.keypair(), bob.pubkey(), mint.last_id()),
Ok(_)
);
}
}

View File

@ -26,7 +26,7 @@ use timing;
use transaction::Transaction;
// number of threads is 1 until mt bank is ready
pub const NUM_THREADS: usize = 1;
pub const NUM_THREADS: usize = 10;
/// Stores the stage's thread handle and output receiver.
pub struct BankingStage {
@ -165,24 +165,8 @@ impl BankingStage {
while chunk_start != transactions.len() {
let chunk_end = chunk_start + Entry::num_will_fit(&transactions[chunk_start..]);
let results = bank.process_transactions(&transactions[chunk_start..chunk_end]);
bank.process_and_record_transactions(&transactions[chunk_start..chunk_end], poh)?;
let processed_transactions: Vec<_> = transactions[chunk_start..chunk_end]
.into_iter()
.enumerate()
.filter_map(|(i, x)| match results[i] {
Ok(_) => Some(x.clone()),
Err(ref e) => {
debug!("process transaction failed {:?}", e);
None
}
}).collect();
if !processed_transactions.is_empty() {
let hash = Transaction::hash(&processed_transactions);
debug!("processed ok: {} {}", processed_transactions.len(), hash);
poh.record(hash, processed_transactions)?;
}
chunk_start = chunk_end;
}
debug!("done process_transactions");
@ -403,11 +387,9 @@ mod tests {
// the account balance below zero before the credit is added.
let bank = Bank::new(&mint);
for entry in entries {
assert!(
bank.process_transactions(&entry.transactions)
.into_iter()
.all(|x| x.is_ok())
);
bank.process_transactions(&entry.transactions)
.iter()
.for_each(|x| assert_eq!(*x, Ok(())));
}
assert_eq!(bank.get_balance(&alice.pubkey()), 1);
}

View File

@ -285,7 +285,7 @@ pub mod tests {
// vote should be valid
let blob = &vote_blob.unwrap()[0];
let tx = deserialize(&(blob.read().unwrap().data)).unwrap();
assert!(bank.process_transaction(&tx).is_ok());
assert_eq!(bank.process_transaction(&tx), Ok(()));
}
#[test]