Cleanup TVU code to look like its block diagram (#1737)

* Reorg TVU code to look like TVU diagram

And move channel creation into LedgerWriteStage so that it can
be used in the same was as all the other stages.

* Delete commented out code
This commit is contained in:
Greg Fitzgerald 2018-11-07 19:25:36 -07:00 committed by GitHub
parent 25dd5145bb
commit 2a6046de8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 33 deletions

View File

@ -8,7 +8,7 @@ use log::Level;
use result::{Error, Result}; use result::{Error, Result};
use service::Service; use service::Service;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::{channel, RecvTimeoutError};
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use timing::duration_as_ms; use timing::duration_as_ms;
@ -21,7 +21,7 @@ impl LedgerWriteStage {
pub fn write( pub fn write(
ledger_writer: Option<&mut LedgerWriter>, ledger_writer: Option<&mut LedgerWriter>,
entry_receiver: &EntryReceiver, entry_receiver: &EntryReceiver,
forwarder: &Option<EntrySender>, entry_sender: &EntrySender,
) -> Result<()> { ) -> Result<()> {
let mut ventries = Vec::new(); let mut ventries = Vec::new();
let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
@ -44,10 +44,8 @@ impl LedgerWriteStage {
} }
inc_new_counter_info!("ledger_writer_stage-entries_received", num_new_entries); inc_new_counter_info!("ledger_writer_stage-entries_received", num_new_entries);
if let Some(forwarder) = forwarder { for entries in ventries {
for entries in ventries { entry_sender.send(entries)?;
forwarder.send(entries)?;
}
} }
inc_new_counter_info!( inc_new_counter_info!(
"ledger_writer_stage-time_ms", "ledger_writer_stage-time_ms",
@ -56,17 +54,15 @@ impl LedgerWriteStage {
Ok(()) Ok(())
} }
pub fn new( pub fn new(ledger_path: Option<&str>, entry_receiver: EntryReceiver) -> (Self, EntryReceiver) {
ledger_path: Option<&str>,
entry_receiver: EntryReceiver,
forwarder: Option<EntrySender>,
) -> Self {
let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap()); let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap());
let (entry_sender, entry_forwarder) = channel();
let write_thread = Builder::new() let write_thread = Builder::new()
.name("solana-ledger-writer".to_string()) .name("solana-ledger-writer".to_string())
.spawn(move || loop { .spawn(move || loop {
if let Err(e) = Self::write(ledger_writer.as_mut(), &entry_receiver, &forwarder) { if let Err(e) = Self::write(ledger_writer.as_mut(), &entry_receiver, &entry_sender)
{
match e { match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
break; break;
@ -83,7 +79,7 @@ impl LedgerWriteStage {
}; };
}).unwrap(); }).unwrap();
LedgerWriteStage { write_thread } (LedgerWriteStage { write_thread }, entry_forwarder)
} }
} }

View File

@ -12,7 +12,6 @@ use service::Service;
use sigverify_stage::SigVerifyStage; use sigverify_stage::SigVerifyStage;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::mpsc::Receiver; use std::sync::mpsc::Receiver;
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
@ -54,9 +53,8 @@ impl Tpu {
max_tick_height, max_tick_height,
); );
let (ledger_entry_sender, entry_forwarder) = channel(); let (ledger_write_stage, entry_forwarder) =
let ledger_write_stage = LedgerWriteStage::new(Some(ledger_path), entry_receiver);
LedgerWriteStage::new(Some(ledger_path), entry_receiver, Some(ledger_entry_sender));
let tpu = Tpu { let tpu = Tpu {
fetch_stage, fetch_stage,

View File

@ -21,7 +21,6 @@ use service::Service;
use signature::Keypair; use signature::Keypair;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
use storage_stage::{StorageStage, StorageState}; use storage_stage::{StorageStage, StorageState};
@ -90,19 +89,8 @@ impl Tvu {
bank.leader_scheduler.clone(), bank.leader_scheduler.clone(),
); );
let (storage_entry_sender, storage_entry_receiver) = channel();
let storage_state = StorageState::new();
let storage_stage = StorageStage::new(
&storage_state,
storage_entry_receiver,
ledger_path,
keypair.clone(),
exit.clone(),
entry_height,
);
let (replicate_stage, ledger_entry_receiver) = ReplicateStage::new( let (replicate_stage, ledger_entry_receiver) = ReplicateStage::new(
keypair, keypair.clone(),
vote_account_keypair, vote_account_keypair,
bank.clone(), bank.clone(),
cluster_info, cluster_info,
@ -111,10 +99,17 @@ impl Tvu {
entry_height, entry_height,
); );
let ledger_write_stage = LedgerWriteStage::new( let (ledger_write_stage, storage_entry_receiver) =
LedgerWriteStage::new(ledger_path, ledger_entry_receiver);
let storage_state = StorageState::new();
let storage_stage = StorageStage::new(
&storage_state,
storage_entry_receiver,
ledger_path, ledger_path,
ledger_entry_receiver, keypair,
Some(storage_entry_sender), exit.clone(),
entry_height,
); );
Tvu { Tvu {