diff --git a/src/accounting_stage.rs b/src/accounting_stage.rs index df2455e3f5..ecf1345277 100644 --- a/src/accounting_stage.rs +++ b/src/accounting_stage.rs @@ -66,22 +66,22 @@ mod tests { // Entry OR if the verifier tries to parallelize across multiple Entries. let mint = Mint::new(2); let acc = Accountant::new(&mint); - let stage = AccountingStage::new(acc, &mint.last_id(), None); + let accounting_stage = AccountingStage::new(acc, &mint.last_id(), None); // Process a batch that includes a transaction that receives two tokens. let alice = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); let events = vec![Event::Transaction(tr)]; - assert!(stage.process_events(events).is_ok()); + assert!(accounting_stage.process_events(events).is_ok()); // Process a second batch that spends one of those tokens. let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); let events = vec![Event::Transaction(tr)]; - assert!(stage.process_events(events).is_ok()); + assert!(accounting_stage.process_events(events).is_ok()); // Collect the ledger and feed it to a new accountant. - drop(stage.entry_sender); - let entries: Vec = stage.output.lock().unwrap().iter().collect(); + drop(accounting_stage.entry_sender); + let entries: Vec = accounting_stage.output.lock().unwrap().iter().collect(); // Assert the user holds one token, not two. If the server only output one // entry, then the second transaction will be rejected, because it drives @@ -156,17 +156,17 @@ mod bench { .collect(); let (input, event_receiver) = channel(); - let stage = AccountingStage::new(acc, &mint.last_id(), None); + let accounting_stage = AccountingStage::new(acc, &mint.last_id(), None); let now = Instant::now(); - assert!(stage.process_events(events).is_ok()); + assert!(accounting_stage.process_events(events).is_ok()); let duration = now.elapsed(); let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; let tps = txs as f64 / sec; // Ensure that all transactions were successfully logged. - drop(stage.historian_input); - let entries: Vec = stage.output.lock().unwrap().iter().collect(); + drop(accounting_stage.historian_input); + let entries: Vec = accounting_stage.output.lock().unwrap().iter().collect(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].events.len(), txs as usize); diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index cddd4e1e7e..bea72d561d 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -115,9 +115,9 @@ fn main() { eprintln!("creating networking stack..."); - let accounting = AccountingStage::new(acc, &last_id, Some(1000)); + let accounting_stage = AccountingStage::new(acc, &last_id, Some(1000)); let exit = Arc::new(AtomicBool::new(false)); - let tpu = Arc::new(Tpu::new(accounting)); + let tpu = Arc::new(Tpu::new(accounting_stage)); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); diff --git a/src/thin_client.rs b/src/thin_client.rs index 74b7665d95..ad6ac2acea 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -182,8 +182,8 @@ mod tests { let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30)); - let acc = Arc::new(Tpu::new(accounting)); + let accounting_stage = AccountingStage::new(acc, &alice.last_id(), Some(30)); + let acc = Arc::new(Tpu::new(accounting_stage)); let threads = Tpu::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(300)); @@ -218,8 +218,8 @@ mod tests { let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30)); - let tpu = Arc::new(Tpu::new(accounting)); + let accounting_stage = AccountingStage::new(acc, &alice.last_id(), Some(30)); + let tpu = Arc::new(Tpu::new(accounting_stage)); let serve_addr = leader_serve.local_addr().unwrap(); let threads = Tpu::serve( &tpu, @@ -287,14 +287,14 @@ mod tests { let leader_acc = { let acc = Accountant::new(&alice); - let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30)); - Arc::new(Tpu::new(accounting)) + let accounting_stage = AccountingStage::new(acc, &alice.last_id(), Some(30)); + Arc::new(Tpu::new(accounting_stage)) }; let replicant_acc = { let acc = Accountant::new(&alice); - let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30)); - Arc::new(Tpu::new(accounting)) + let accounting_stage = AccountingStage::new(acc, &alice.last_id(), Some(30)); + Arc::new(Tpu::new(accounting_stage)) }; let leader_threads = Tpu::serve( diff --git a/src/tpu.rs b/src/tpu.rs index 91ff470948..950bafc4e4 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -30,7 +30,7 @@ use thin_client_service::{Request, Response, ThinClientService}; use timing; pub struct Tpu { - accounting: AccountingStage, + accounting_stage: AccountingStage, thin_client_service: ThinClientService, } @@ -38,17 +38,17 @@ type SharedTpu = Arc; impl Tpu { /// Create a new Tpu that wraps the given Accountant. - pub fn new(accounting: AccountingStage) -> Self { - let thin_client_service = ThinClientService::new(accounting.acc.clone()); + pub fn new(accounting_stage: AccountingStage) -> Self { + let thin_client_service = ThinClientService::new(accounting_stage.acc.clone()); Tpu { - accounting, + accounting_stage, thin_client_service, } } fn update_entry(obj: &Tpu, writer: &Mutex, entry: &Entry) { trace!("update_entry entry"); - obj.accounting.acc.register_entry_id(&entry.id); + obj.accounting_stage.acc.register_entry_id(&entry.id); writeln!( writer.lock().unwrap(), "{}", @@ -61,14 +61,14 @@ impl Tpu { fn receive_all(obj: &Tpu, writer: &Mutex) -> Result> { //TODO implement a serialize for channel that does this without allocations let mut l = vec![]; - let entry = obj.accounting + let entry = obj.accounting_stage .output .lock() .unwrap() .recv_timeout(Duration::new(1, 0))?; Self::update_entry(obj, writer, &entry); l.push(entry); - while let Ok(entry) = obj.accounting.output.lock().unwrap().try_recv() { + while let Ok(entry) = obj.accounting_stage.output.lock().unwrap().try_recv() { Self::update_entry(obj, writer, &entry); l.push(entry); } @@ -338,7 +338,7 @@ impl Tpu { debug!("events: {} reqs: {}", events.len(), reqs.len()); debug!("process_events"); - obj.accounting.process_events(events)?; + obj.accounting_stage.process_events(events)?; debug!("done process_events"); debug!("process_requests"); @@ -378,7 +378,7 @@ impl Tpu { for msgs in &blobs { let blob = msgs.read().unwrap(); let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); - let acc = &obj.accounting.acc; + let acc = &obj.accounting_stage.acc; for entry in entries { acc.register_entry_id(&entry.id); for result in acc.process_verified_events(entry.events) { @@ -463,7 +463,8 @@ impl Tpu { Mutex::new(writer), ); - let t_skinny = Self::thin_client_service(obj.accounting.acc.clone(), exit.clone(), skinny); + let t_skinny = + Self::thin_client_service(obj.accounting_stage.acc.clone(), exit.clone(), skinny); let tpu = obj.clone(); let t_server = spawn(move || loop { @@ -787,8 +788,8 @@ mod tests { let starting_balance = 10_000; let alice = Mint::new(starting_balance); let acc = Accountant::new(&alice); - let accounting = AccountingStage::new(acc, &alice.last_id(), Some(30)); - let tpu = Arc::new(Tpu::new(accounting)); + let accounting_stage = AccountingStage::new(acc, &alice.last_id(), Some(30)); + let tpu = Arc::new(Tpu::new(accounting_stage)); let replicate_addr = target1_data.replicate_addr; let threads = Tpu::replicate( &tpu, @@ -813,7 +814,7 @@ mod tests { w.set_index(i).unwrap(); w.set_id(leader_id).unwrap(); - let acc = &tpu.accounting.acc; + let acc = &tpu.accounting_stage.acc; let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); @@ -855,7 +856,7 @@ mod tests { msgs.push(msg); } - let acc = &tpu.accounting.acc; + let acc = &tpu.accounting_stage.acc; let alice_balance = acc.get_balance(&alice.keypair().pubkey()).unwrap(); assert_eq!(alice_balance, alice_ref_balance);