From 3f10bf44db2de96b6204461333e54db30554f6f2 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 14:12:36 -0600 Subject: [PATCH] Config recorder with any kind of Duration, not just milliseconds --- src/bin/testnode.rs | 4 +++- src/event_processor.rs | 9 +++++---- src/historian.rs | 17 ++++++++--------- src/recorder.rs | 9 ++++----- src/rpu.rs | 2 +- src/thin_client.rs | 24 ++++++++++++++++++++---- src/tvu.rs | 8 ++++++-- 7 files changed, 47 insertions(+), 26 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 74056b13b8..b298ca4157 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -19,6 +19,7 @@ use std::net::UdpSocket; use std::process::exit; use std::sync::Arc; use std::sync::atomic::AtomicBool; +use std::time::Duration; fn print_usage(program: &str, opts: Options) { let mut brief = format!("Usage: cat | {} [options]\n\n", program); @@ -115,7 +116,8 @@ fn main() { eprintln!("creating networking stack..."); - let event_processor = EventProcessor::new(accountant, &last_id, Some(1000)); + let event_processor = + EventProcessor::new(accountant, &last_id, Some(Duration::from_millis(1000))); let exit = Arc::new(AtomicBool::new(false)); let rpu = Rpu::new(event_processor); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); diff --git a/src/event_processor.rs b/src/event_processor.rs index 07fc66b10a..4f30def1f7 100644 --- a/src/event_processor.rs +++ b/src/event_processor.rs @@ -9,26 +9,27 @@ use recorder::Signal; use result::Result; use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, Mutex}; +use std::time::Duration; pub struct EventProcessor { pub accountant: Arc, historian_input: Mutex>, historian: Mutex, pub start_hash: Hash, - pub ms_per_tick: Option, + pub tick_duration: Option, } impl EventProcessor { /// Create a new stage of the TPU for event and transaction processing - pub fn new(accountant: Accountant, start_hash: &Hash, ms_per_tick: Option) -> Self { + pub fn new(accountant: Accountant, start_hash: &Hash, tick_duration: Option) -> Self { let (historian_input, event_receiver) = channel(); - let historian = Historian::new(event_receiver, start_hash, ms_per_tick); + let historian = Historian::new(event_receiver, start_hash, tick_duration); EventProcessor { accountant: Arc::new(accountant), historian_input: Mutex::new(historian_input), historian: Mutex::new(historian), start_hash: *start_hash, - ms_per_tick, + tick_duration, } } diff --git a/src/historian.rs b/src/historian.rs index e7234ece56..553d3ff6ef 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -6,7 +6,7 @@ use hash::Hash; use recorder::{ExitReason, Recorder, Signal}; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::thread::{spawn, JoinHandle}; -use std::time::Instant; +use std::time::{Duration, Instant}; pub struct Historian { pub entry_receiver: Receiver, @@ -17,11 +17,11 @@ impl Historian { pub fn new( event_receiver: Receiver, start_hash: &Hash, - ms_per_tick: Option, + tick_duration: Option, ) -> Self { let (entry_sender, entry_receiver) = channel(); let thread_hdl = - Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); + Self::create_recorder(*start_hash, tick_duration, event_receiver, entry_sender); Historian { entry_receiver, thread_hdl, @@ -32,18 +32,18 @@ impl Historian { /// sending back Entry messages until either the receiver or sender channel is closed. fn create_recorder( start_hash: Hash, - ms_per_tick: Option, + tick_duration: Option, receiver: Receiver, sender: Sender, ) -> JoinHandle { spawn(move || { let mut recorder = Recorder::new(receiver, sender, start_hash); - let now = Instant::now(); + let duration_data = tick_duration.map(|dur| (Instant::now(), dur)); loop { - if let Err(err) = recorder.process_events(now, ms_per_tick) { + if let Err(err) = recorder.process_events(duration_data) { return err; } - if ms_per_tick.is_some() { + if duration_data.is_some() { recorder.hash(); } } @@ -60,7 +60,6 @@ mod tests { use super::*; use ledger::Block; use std::thread::sleep; - use std::time::Duration; #[test] fn test_historian() { @@ -109,7 +108,7 @@ mod tests { fn test_ticking_historian() { let (input, event_receiver) = channel(); let zero = Hash::default(); - let hist = Historian::new(event_receiver, &zero, Some(20)); + let hist = Historian::new(event_receiver, &zero, Some(Duration::from_millis(20))); sleep(Duration::from_millis(900)); input.send(Signal::Tick).unwrap(); drop(input); diff --git a/src/recorder.rs b/src/recorder.rs index 208d62b6ed..3866747fa2 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -57,12 +57,11 @@ impl Recorder { pub fn process_events( &mut self, - epoch: Instant, - ms_per_tick: Option, + duration_data: Option<(Instant, Duration)>, ) -> Result<(), ExitReason> { loop { - if let Some(ms) = ms_per_tick { - if epoch.elapsed() > Duration::from_millis(ms) * (self.num_ticks + 1) { + if let Some((start_time, tick_duration)) = duration_data { + if start_time.elapsed() > tick_duration * (self.num_ticks + 1) { self.record_entry(vec![])?; // TODO: don't let this overflow u32 self.num_ticks += 1; @@ -105,7 +104,7 @@ mod tests { signal_sender .send(Signal::Events(vec![event0, event1])) .unwrap(); - recorder.process_events(Instant::now(), None).unwrap(); + recorder.process_events(None).unwrap(); drop(recorder.sender); let entries: Vec<_> = entry_receiver.iter().collect(); diff --git a/src/rpu.rs b/src/rpu.rs index b5127c1603..0690d6069f 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -98,7 +98,7 @@ impl Rpu { let historian_stage = Historian::new( request_stage.signal_receiver, &self.event_processor.start_hash, - self.event_processor.ms_per_tick, + self.event_processor.tick_duration, ); let (broadcast_sender, broadcast_receiver) = channel(); diff --git a/src/thin_client.rs b/src/thin_client.rs index 2bc2908432..87cc37b856 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -191,7 +191,11 @@ mod tests { let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let event_processor = EventProcessor::new( + accountant, + &alice.last_id(), + Some(Duration::from_millis(30)), + ); let rpu = Rpu::new(event_processor); let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(900)); @@ -230,7 +234,11 @@ mod tests { let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let event_processor = EventProcessor::new( + accountant, + &alice.last_id(), + Some(Duration::from_millis(30)), + ); let rpu = Rpu::new(event_processor); let serve_addr = leader_serve.local_addr().unwrap(); let threads = rpu.serve( @@ -300,13 +308,21 @@ mod tests { let leader_acc = { let accountant = Accountant::new(&alice); - let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let event_processor = EventProcessor::new( + accountant, + &alice.last_id(), + Some(Duration::from_millis(30)), + ); Rpu::new(event_processor) }; let replicant_acc = { let accountant = Accountant::new(&alice); - let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let event_processor = EventProcessor::new( + accountant, + &alice.last_id(), + Some(Duration::from_millis(30)), + ); Arc::new(Tvu::new(event_processor)) }; diff --git a/src/tvu.rs b/src/tvu.rs index 39e0498d45..f05e9c2dc0 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -183,7 +183,7 @@ impl Tvu { let historian_stage = Historian::new( request_stage.signal_receiver, &obj.event_processor.start_hash, - obj.event_processor.ms_per_tick, + obj.event_processor.tick_duration, ); let t_write = Self::drain_service( @@ -311,7 +311,11 @@ mod tests { let starting_balance = 10_000; let alice = Mint::new(starting_balance); let accountant = Accountant::new(&alice); - let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let event_processor = EventProcessor::new( + accountant, + &alice.last_id(), + Some(Duration::from_millis(30)), + ); let tvu = Arc::new(Tvu::new(event_processor)); let replicate_addr = target1_data.replicate_addr; let threads = Tvu::serve(