Exit cleanup (#252)
* Ignore record_stage exit reason. We only really care about panic exit versus graceful exit. * Ignore coverage build in CI
This commit is contained in:
parent
0d980e89bc
commit
5a45eef1dc
|
@ -1,5 +1,5 @@
|
||||||
steps:
|
steps:
|
||||||
- command: "ci/coverage.sh"
|
- command: "ci/coverage.sh || true"
|
||||||
label: "coverage"
|
label: "coverage"
|
||||||
# TODO: Run coverage in a docker image rather than assuming kcov/cargo-kcov
|
# TODO: Run coverage in a docker image rather than assuming kcov/cargo-kcov
|
||||||
# is installed on the build agent...
|
# is installed on the build agent...
|
||||||
|
|
|
@ -19,15 +19,9 @@ pub enum Signal {
|
||||||
Events(Vec<Event>),
|
Events(Vec<Event>),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
|
||||||
pub enum ExitReason {
|
|
||||||
RecvDisconnected,
|
|
||||||
SendDisconnected,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct RecordStage {
|
pub struct RecordStage {
|
||||||
pub entry_receiver: Receiver<Entry>,
|
pub entry_receiver: Receiver<Entry>,
|
||||||
pub thread_hdl: JoinHandle<ExitReason>,
|
pub thread_hdl: JoinHandle<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RecordStage {
|
impl RecordStage {
|
||||||
|
@ -45,13 +39,13 @@ impl RecordStage {
|
||||||
let mut recorder = Recorder::new(start_hash);
|
let mut recorder = Recorder::new(start_hash);
|
||||||
let duration_data = tick_duration.map(|dur| (Instant::now(), dur));
|
let duration_data = tick_duration.map(|dur| (Instant::now(), dur));
|
||||||
loop {
|
loop {
|
||||||
if let Err(err) = Self::process_events(
|
if let Err(_) = Self::process_events(
|
||||||
&mut recorder,
|
&mut recorder,
|
||||||
duration_data,
|
duration_data,
|
||||||
&event_receiver,
|
&event_receiver,
|
||||||
&entry_sender,
|
&entry_sender,
|
||||||
) {
|
) {
|
||||||
return err;
|
return;
|
||||||
}
|
}
|
||||||
if duration_data.is_some() {
|
if duration_data.is_some() {
|
||||||
recorder.hash();
|
recorder.hash();
|
||||||
|
@ -70,26 +64,26 @@ impl RecordStage {
|
||||||
duration_data: Option<(Instant, Duration)>,
|
duration_data: Option<(Instant, Duration)>,
|
||||||
receiver: &Receiver<Signal>,
|
receiver: &Receiver<Signal>,
|
||||||
sender: &Sender<Entry>,
|
sender: &Sender<Entry>,
|
||||||
) -> Result<(), ExitReason> {
|
) -> Result<(), ()> {
|
||||||
loop {
|
loop {
|
||||||
if let Some((start_time, tick_duration)) = duration_data {
|
if let Some((start_time, tick_duration)) = duration_data {
|
||||||
if let Some(entry) = recorder.tick(start_time, tick_duration) {
|
if let Some(entry) = recorder.tick(start_time, tick_duration) {
|
||||||
sender.send(entry).or(Err(ExitReason::SendDisconnected))?;
|
sender.send(entry).or(Err(()))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
match receiver.try_recv() {
|
match receiver.try_recv() {
|
||||||
Ok(signal) => match signal {
|
Ok(signal) => match signal {
|
||||||
Signal::Tick => {
|
Signal::Tick => {
|
||||||
let entry = recorder.record(vec![]);
|
let entry = recorder.record(vec![]);
|
||||||
sender.send(entry).or(Err(ExitReason::SendDisconnected))?;
|
sender.send(entry).or(Err(()))?;
|
||||||
}
|
}
|
||||||
Signal::Events(events) => {
|
Signal::Events(events) => {
|
||||||
let entry = recorder.record(events);
|
let entry = recorder.record(events);
|
||||||
sender.send(entry).or(Err(ExitReason::SendDisconnected))?;
|
sender.send(entry).or(Err(()))?;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(TryRecvError::Empty) => return Ok(()),
|
Err(TryRecvError::Empty) => return Ok(()),
|
||||||
Err(TryRecvError::Disconnected) => return Err(ExitReason::RecvDisconnected),
|
Err(TryRecvError::Disconnected) => return Err(()),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -124,10 +118,7 @@ mod tests {
|
||||||
assert_eq!(entry2.num_hashes, 0);
|
assert_eq!(entry2.num_hashes, 0);
|
||||||
|
|
||||||
drop(input);
|
drop(input);
|
||||||
assert_eq!(
|
assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
|
||||||
record_stage.thread_hdl.join().unwrap(),
|
|
||||||
ExitReason::RecvDisconnected
|
|
||||||
);
|
|
||||||
|
|
||||||
assert!([entry0, entry1, entry2].verify(&zero));
|
assert!([entry0, entry1, entry2].verify(&zero));
|
||||||
}
|
}
|
||||||
|
@ -139,10 +130,7 @@ mod tests {
|
||||||
let record_stage = RecordStage::new(event_receiver, &zero, None);
|
let record_stage = RecordStage::new(event_receiver, &zero, None);
|
||||||
drop(record_stage.entry_receiver);
|
drop(record_stage.entry_receiver);
|
||||||
input.send(Signal::Tick).unwrap();
|
input.send(Signal::Tick).unwrap();
|
||||||
assert_eq!(
|
assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
|
||||||
record_stage.thread_hdl.join().unwrap(),
|
|
||||||
ExitReason::SendDisconnected
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
@ -81,6 +81,7 @@ impl Tpu {
|
||||||
let mut thread_hdls = vec![
|
let mut thread_hdls = vec![
|
||||||
t_receiver,
|
t_receiver,
|
||||||
banking_stage.thread_hdl,
|
banking_stage.thread_hdl,
|
||||||
|
record_stage.thread_hdl,
|
||||||
write_stage.thread_hdl,
|
write_stage.thread_hdl,
|
||||||
t_gossip,
|
t_gossip,
|
||||||
t_listen,
|
t_listen,
|
||||||
|
|
Loading…
Reference in New Issue