This commit is contained in:
Anatoly Yakovenko 2018-05-30 12:07:28 -07:00 committed by Greg Fitzgerald
parent f5eedd2d19
commit e44e81bd17
5 changed files with 28 additions and 56 deletions

View File

@ -70,8 +70,7 @@ impl Bank {
/// Commit funds to the 'to' party. /// Commit funds to the 'to' party.
fn apply_payment(&self, payment: &Payment) { fn apply_payment(&self, payment: &Payment) {
// First we check balances with a read lock to maximize potential parallelization. // First we check balances with a read lock to maximize potential parallelization.
if self if self.balances
.balances
.read() .read()
.expect("'balances' read lock in apply_payment") .expect("'balances' read lock in apply_payment")
.contains_key(&payment.to) .contains_key(&payment.to)
@ -120,8 +119,7 @@ impl Bank {
} }
fn forget_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) { fn forget_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) {
if let Some(entry) = self if let Some(entry) = self.last_ids
.last_ids
.read() .read()
.expect("'last_ids' read lock in forget_signature_with_last_id") .expect("'last_ids' read lock in forget_signature_with_last_id")
.iter() .iter()
@ -133,8 +131,7 @@ impl Bank {
} }
fn reserve_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> Result<()> { fn reserve_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> Result<()> {
if let Some(entry) = self if let Some(entry) = self.last_ids
.last_ids
.read() .read()
.expect("'last_ids' read lock in reserve_signature_with_last_id") .expect("'last_ids' read lock in reserve_signature_with_last_id")
.iter() .iter()
@ -151,8 +148,7 @@ impl Bank {
/// the oldest ones once its internal cache is full. Once boot, the /// the oldest ones once its internal cache is full. Once boot, the
/// bank will reject transactions using that `last_id`. /// bank will reject transactions using that `last_id`.
pub fn register_entry_id(&self, last_id: &Hash) { pub fn register_entry_id(&self, last_id: &Hash) {
let mut last_ids = self let mut last_ids = self.last_ids
.last_ids
.write() .write()
.expect("'last_ids' write lock in register_entry_id"); .expect("'last_ids' write lock in register_entry_id");
if last_ids.len() >= MAX_ENTRY_IDS { if last_ids.len() >= MAX_ENTRY_IDS {
@ -170,8 +166,7 @@ impl Bank {
return Err(BankError::NegativeTokens); return Err(BankError::NegativeTokens);
} }
} }
let bals = self let bals = self.balances
.balances
.read() .read()
.expect("'balances' read lock in apply_debits"); .expect("'balances' read lock in apply_debits");
let option = bals.get(&tx.from); let option = bals.get(&tx.from);
@ -216,18 +211,14 @@ impl Bank {
match &tx.instruction { match &tx.instruction {
Instruction::NewContract(contract) => { Instruction::NewContract(contract) => {
let mut plan = contract.plan.clone(); let mut plan = contract.plan.clone();
plan.apply_witness(&Witness::Timestamp( plan.apply_witness(&Witness::Timestamp(*self.last_time
*self .read()
.last_time .expect("timestamp creation in apply_credits")));
.read()
.expect("timestamp creation in apply_credits"),
));
if let Some(ref payment) = plan.final_payment() { if let Some(ref payment) = plan.final_payment() {
self.apply_payment(payment); self.apply_payment(payment);
} else { } else {
let mut pending = self let mut pending = self.pending
.pending
.write() .write()
.expect("'pending' write lock in apply_credits"); .expect("'pending' write lock in apply_credits");
pending.insert(tx.sig, plan); pending.insert(tx.sig, plan);
@ -254,8 +245,7 @@ impl Bank {
// Run all debits first to filter out any transactions that can't be processed // Run all debits first to filter out any transactions that can't be processed
// in parallel deterministically. // in parallel deterministically.
info!("processing Transactions {}", txs.len()); info!("processing Transactions {}", txs.len());
let results: Vec<_> = txs let results: Vec<_> = txs.into_par_iter()
.into_par_iter()
.map(|tx| self.apply_debits(&tx).map(|_| tx)) .map(|tx| self.apply_debits(&tx).map(|_| tx))
.collect(); // Calling collect() here forces all debits to complete before moving on. .collect(); // Calling collect() here forces all debits to complete before moving on.
@ -282,8 +272,7 @@ impl Bank {
/// Process a Witness Signature. /// Process a Witness Signature.
fn apply_signature(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { fn apply_signature(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
if let Occupied(mut e) = self if let Occupied(mut e) = self.pending
.pending
.write() .write()
.expect("write() in apply_signature") .expect("write() in apply_signature")
.entry(tx_sig) .entry(tx_sig)
@ -302,8 +291,7 @@ impl Bank {
fn apply_timestamp(&self, from: PublicKey, dt: DateTime<Utc>) -> Result<()> { fn apply_timestamp(&self, from: PublicKey, dt: DateTime<Utc>) -> Result<()> {
// If this is the first timestamp we've seen, it probably came from the genesis block, // If this is the first timestamp we've seen, it probably came from the genesis block,
// so we'll trust it. // so we'll trust it.
if *self if *self.last_time
.last_time
.read() .read()
.expect("'last_time' read lock on first timestamp check") .expect("'last_time' read lock on first timestamp check")
== Utc.timestamp(0, 0) == Utc.timestamp(0, 0)
@ -314,8 +302,7 @@ impl Bank {
.insert(from); .insert(from);
} }
if self if self.time_sources
.time_sources
.read() .read()
.expect("'time_sources' read lock") .expect("'time_sources' read lock")
.contains(&from) .contains(&from)
@ -332,17 +319,13 @@ impl Bank {
// Hold 'pending' write lock until the end of this function. Otherwise another thread can // Hold 'pending' write lock until the end of this function. Otherwise another thread can
// double-spend if it enters before the modified plan is removed from 'pending'. // double-spend if it enters before the modified plan is removed from 'pending'.
let mut pending = self let mut pending = self.pending
.pending
.write() .write()
.expect("'pending' write lock in apply_timestamp"); .expect("'pending' write lock in apply_timestamp");
for (key, plan) in pending.iter_mut() { for (key, plan) in pending.iter_mut() {
plan.apply_witness(&Witness::Timestamp( plan.apply_witness(&Witness::Timestamp(*self.last_time
*self .read()
.last_time .expect("'last_time' read lock when creating timestamp")));
.read()
.expect("'last_time' read lock when creating timestamp"),
));
if let Some(ref payment) = plan.final_payment() { if let Some(ref payment) = plan.final_payment() {
self.apply_payment(payment); self.apply_payment(payment);
completed.push(key.clone()); completed.push(key.clone());
@ -387,8 +370,7 @@ impl Bank {
} }
pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> { pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> {
let bals = self let bals = self.balances
.balances
.read() .read()
.expect("'balances' read lock in get_balance"); .expect("'balances' read lock in get_balance");
bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64) bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64)
@ -530,8 +512,7 @@ mod tests {
let bank = Bank::new(&mint); let bank = Bank::new(&mint);
let pubkey = KeyPair::new().pubkey(); let pubkey = KeyPair::new().pubkey();
let dt = Utc::now(); let dt = Utc::now();
let sig = bank let sig = bank.transfer_on_date(1, &mint.keypair(), pubkey, dt, mint.last_id())
.transfer_on_date(1, &mint.keypair(), pubkey, dt, mint.last_id())
.unwrap(); .unwrap();
// Assert the debit counts as a transaction. // Assert the debit counts as a transaction.

View File

@ -326,8 +326,7 @@ impl Crdt {
} }
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) { fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) {
//trace!("get updates since {}", v); //trace!("get updates since {}", v);
let data = self let data = self.table
.table
.values() .values()
.filter(|x| self.local[&x.id] > v) .filter(|x| self.local[&x.id] > v)
.cloned() .cloned()
@ -339,8 +338,7 @@ impl Crdt {
pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> { pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
let valid: Vec<_> = self let valid: Vec<_> = self.table
.table
.values() .values()
.filter(|r| r.id != self.me && r.replicate_addr != daddr) .filter(|r| r.id != self.me && r.replicate_addr != daddr)
.collect(); .collect();
@ -393,8 +391,7 @@ impl Crdt {
// Lock the object only to do this operation and not for any longer // Lock the object only to do this operation and not for any longer
// especially not when doing the `sock.send_to` // especially not when doing the `sock.send_to`
let (remote_gossip_addr, req) = obj let (remote_gossip_addr, req) = obj.read()
.read()
.expect("'obj' read lock in fn run_gossip") .expect("'obj' read lock in fn run_gossip")
.gossip_request()?; .gossip_request()?;
// TODO this will get chatty, so we need to first ask for number of updates since // TODO this will get chatty, so we need to first ask for number of updates since
@ -486,8 +483,7 @@ impl Crdt {
trace!("RequestUpdates {}", v); trace!("RequestUpdates {}", v);
let addr = reqdata.gossip_addr; let addr = reqdata.gossip_addr;
// only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from` // only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = obj let (from, ups, data) = obj.read()
.read()
.expect("'obj' read lock in RequestUpdates") .expect("'obj' read lock in RequestUpdates")
.get_updates_since(v); .get_updates_since(v);
trace!("get updates since response {} {}", v, data.len()); trace!("get updates since response {} {}", v, data.len());
@ -558,8 +554,7 @@ impl Crdt {
while let Ok(mut more) = requests_receiver.try_recv() { while let Ok(mut more) = requests_receiver.try_recv() {
reqs.append(&mut more); reqs.append(&mut more);
} }
let resp: VecDeque<_> = reqs let resp: VecDeque<_> = reqs.iter()
.iter()
.filter_map(|b| Self::handle_blob(obj, window, blob_recycler, &b.read().unwrap())) .filter_map(|b| Self::handle_blob(obj, window, blob_recycler, &b.read().unwrap()))
.collect(); .collect();
response_sender.send(resp)?; response_sender.send(resp)?;

View File

@ -29,8 +29,7 @@ fn recv_loop(
let msgs = re.allocate(); let msgs = re.allocate();
let msgs_ = msgs.clone(); let msgs_ = msgs.clone();
loop { loop {
match msgs match msgs.write()
.write()
.expect("write lock in fn recv_loop") .expect("write lock in fn recv_loop")
.recv_from(sock) .recv_from(sock)
{ {
@ -201,8 +200,7 @@ fn recv_window(
) -> Result<()> { ) -> Result<()> {
let timer = Duration::from_millis(200); let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?; let mut dq = r.recv_timeout(timer)?;
let leader_id = crdt let leader_id = crdt.read()
.read()
.expect("'crdt' read lock in fn recv_window") .expect("'crdt' read lock in fn recv_window")
.leader_data() .leader_data()
.id; .id;

View File

@ -155,8 +155,7 @@ impl Transaction {
pub fn verify_plan(&self) -> bool { pub fn verify_plan(&self) -> bool {
if let Instruction::NewContract(contract) = &self.instruction { if let Instruction::NewContract(contract) = &self.instruction {
self.fee >= 0 self.fee >= 0 && self.fee <= contract.tokens
&& self.fee <= contract.tokens
&& contract.plan.verify(contract.tokens - self.fee) && contract.plan.verify(contract.tokens - self.fee)
} else { } else {
true true

View File

@ -150,8 +150,7 @@ pub fn crdt_retransmit() {
trace!("waiting to converge:"); trace!("waiting to converge:");
let mut done = false; let mut done = false;
for _ in 0..30 { for _ in 0..30 {
done = c1.read().unwrap().table.len() == 3 done = c1.read().unwrap().table.len() == 3 && c2.read().unwrap().table.len() == 3
&& c2.read().unwrap().table.len() == 3
&& c3.read().unwrap().table.len() == 3; && c3.read().unwrap().table.len() == 3;
if done { if done {
break; break;