diff --git a/src/bank.rs b/src/bank.rs index 23932aace2..164d75c8e5 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -18,6 +18,8 @@ use std::result; use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering}; use std::sync::RwLock; use transaction::{Instruction, Plan, Transaction}; +use std::time::Instant; +use std::cell::Cell; /// The number of most recent `last_id` values that the bank will track the signatures /// of. Once the bank discards a `last_id`, it will reject any transactions that use @@ -57,8 +59,10 @@ pub type Result = result::Result; /// The state of all accounts and contracts after processing its entries. pub struct Bank { /// A map of account public keys to the balance in that account. - balances: RwLock>, + balances: RwLock>, //balances: RwLock>>, + //balances: RwLock>, + /// A map of smart contract transaction signatures to what remains of its payment /// plan. Each transaction that targets the plan should cause it to be reduced. @@ -97,7 +101,7 @@ impl Bank { last_time: RwLock::new(Utc.timestamp(0, 0)), transaction_count: AtomicUsize::new(0), }; - bank.apply_payment(deposit); + bank.apply_payment(deposit, &mut bank.balances.write().unwrap()); bank } @@ -112,31 +116,44 @@ impl Bank { bank } - /// Commit funds to the `payment.to` party. - fn apply_payment(&self, payment: &Payment) { + fn apply_payment(&self, payment: &Payment, balances: &mut HashMap) { + if balances.contains_key(&payment.to) { + *balances.get_mut(&payment.to).unwrap() += payment.tokens; + } else { + balances.insert(payment.to, payment.tokens); + } + } + + /// Commit funds to the 'to' party. + /*fn apply_payment(&self, payment: &Payment) { // First we check balances with a read lock to maximize potential parallelization. if self.balances .read() .expect("'balances' read lock in apply_payment") .contains_key(&payment.to) { - let bals = self.balances.read().expect("'balances' read lock"); + let mut bals = self.balances.write().expect("'balances' read lock"); //bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed); - *bals[&payment.to].write().unwrap() += payment.tokens; + // *bals[&payment.to].write().unwrap() += payment.tokens; + let x = bals.get_mut(&payment.to).unwrap(); + *x += payment.tokens; //trace!("updated balance to {}", bals[&payment.to].load(Ordering::Relaxed)); } else { // Now we know the key wasn't present a nanosecond ago, but it might be there // by the time we aquire a write lock, so we'll have to check again. let mut bals = self.balances.write().expect("'balances' write lock"); if bals.contains_key(&payment.to) { - *bals[&payment.to].write().unwrap() += payment.tokens; + let x= bals.get_mut(&payment.to).unwrap(); + *x += payment.tokens; + // *bals[&payment.to].write().unwrap() += payment.tokens; //bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed); } else { //bals.insert(payment.to, AtomicIsize::new(payment.tokens as isize)); - bals.insert(payment.to, RwLock::new(payment.tokens)); + //bals.insert(payment.to, RwLock::new(payment.tokens)); + bals.insert(payment.to, payment.tokens); } } - } + }*/ /// Return the last entry ID registered. pub fn last_id(&self) -> Hash { @@ -209,21 +226,26 @@ impl Bank { last_ids.push_back((*last_id, RwLock::new(HashSet::new()))); } - pub fn apply_debits(&self, tr: &Transaction) -> Result<()> { + pub fn apply_debits(&self, tr: &Transaction, bals: &mut HashMap) -> Result<()> { - let bals = self.balances.read().unwrap(); + //let mut bals = self.balances.write().unwrap(); // Hold a write lock before the condition check, so that a debit can't occur // between checking the balance and the withdraw. - let option = bals.get(&tr.from); + let mut option = bals.get_mut(&tr.from); if option.is_none() { return Err(BankError::AccountNotFound(tr.from)); } - let mut bal = option.unwrap().write().unwrap(); + //let mut bal = option.unwrap().write().unwrap(); + let mut bal = option.unwrap(); self.reserve_signature_with_last_id(&tr.sig, &tr.last_id)?; if let Instruction::NewContract(contract) = &tr.instruction { + if contract.tokens < 0 { + return Err(BankError::NegativeTokens); + } + if *bal < contract.tokens { self.forget_signature_with_last_id(&tr.sig, &tr.last_id); return Err(BankError::InsufficientFunds(tr.from)); @@ -284,9 +306,7 @@ impl Bank { } }*/ - /// Apply only a transaction's credits. Credits from multiple transactions - /// may safely be applied in parallel. - fn apply_credits(&self, tx: &Transaction) { + fn apply_credits(&self, tx: &Transaction, balances: &mut HashMap) { match &tx.instruction { Instruction::NewContract(contract) => { let mut plan = contract.plan.clone(); @@ -295,7 +315,7 @@ impl Bank { .expect("timestamp creation in apply_credits"))); if let Some(payment) = plan.final_payment() { - self.apply_payment(&payment); + self.apply_payment(&payment, balances); } else { let mut pending = self.pending .write() @@ -315,28 +335,45 @@ impl Bank { /// Process a Transaction. If it contains a payment plan that requires a witness /// to progress, the payment plan will be stored in the bank. fn process_transaction(&self, tx: &Transaction) -> Result<()> { - self.apply_debits(tx)?; - self.apply_credits(tx); + let bals = &mut self.balances.write().unwrap(); + self.apply_debits(tx, bals)?; + self.apply_credits(tx, bals); + self.transaction_count.fetch_add(1, Ordering::Relaxed); Ok(()) } /// Process a batch of transactions. It runs all debits first to filter out any /// transactions that can't be processed in parallel deterministically. pub fn process_transactions(&self, txs: Vec) -> Vec> { - debug!("processing Transactions {}", txs.len()); - let results: Vec<_> = txs.into_par_iter() - .map(|tx| self.apply_debits(&tx).map(|_| tx)) + // Run all debits first to filter out any transactions that can't be processed + // in parallel deterministically. + let bals = &mut self.balances.write().unwrap(); + info!("processing Transactions {}", txs.len()); + let now = Instant::now(); + let results: Vec<_> = txs.into_iter() + .map(|tx| self.apply_debits(&tx, bals).map(|_| tx)) .collect(); // Calling collect() here forces all debits to complete before moving on. - results - .into_par_iter() + info!("debits: {:?}", now.elapsed()); + + let res: Vec<_> = results + .into_iter() .map(|result| { result.map(|tx| { - self.apply_credits(&tx); + self.apply_credits(&tx, bals); tx }) }) - .collect() + .collect(); + let mut tr_count = 0; + for r in &res { + if r.is_ok() { + tr_count += 1; + } + } + self.transaction_count.fetch_add(tr_count, Ordering::Relaxed); + info!("credits: {:?}", now.elapsed()); + res } /// Process an ordered list of entries. @@ -365,7 +402,7 @@ impl Bank { { e.get_mut().apply_witness(&Witness::Signature(from)); if let Some(payment) = e.get().final_payment() { - self.apply_payment(&payment); + self.apply_payment(&payment, &mut self.balances.write().unwrap()); e.remove_entry(); } }; @@ -414,7 +451,7 @@ impl Bank { .read() .expect("'last_time' read lock when creating timestamp"))); if let Some(payment) = plan.final_payment() { - self.apply_payment(&payment); + self.apply_payment(&payment, &mut self.balances.write().unwrap()); completed.push(key.clone()); } } @@ -461,7 +498,8 @@ impl Bank { .read() .expect("'balances' read lock in get_balance"); //bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64) - bals.get(pubkey).map(|x| *x.read().unwrap()) + //bals.get(pubkey).map(|x| *x.read().unwrap()) + bals.get(pubkey).map(|x| *x) } pub fn transaction_count(&self) -> usize {