From 81706f2d755faf836e0a016967dc2073334ed4e4 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 16:27:40 -0600 Subject: [PATCH] Move write_service and drain_service into new write_stage module --- src/lib.rs | 1 + src/rpu.rs | 40 +++++--------------------- src/tvu.rs | 28 ++++-------------- src/write_stage.rs | 71 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 84 insertions(+), 56 deletions(-) create mode 100644 src/write_stage.rs diff --git a/src/lib.rs b/src/lib.rs index e61b59f889..1c14c9320f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ pub mod thin_client; pub mod timing; pub mod transaction; pub mod tvu; +pub mod write_stage; extern crate bincode; extern crate byteorder; extern crate chrono; diff --git a/src/rpu.rs b/src/rpu.rs index cf43869d8d..0c0719fcd9 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -3,8 +3,6 @@ use bank::Bank; use crdt::{Crdt, ReplicatedData}; -use entry::Entry; -use entry_writer::EntryWriter; use hash::Hash; use packet; use record_stage::RecordStage; @@ -14,12 +12,13 @@ use result::Result; use sig_verify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver}; +use std::sync::atomic::AtomicBool; +use std::sync::mpsc::channel; use std::sync::{Arc, Mutex, RwLock}; -use std::thread::{spawn, JoinHandle}; +use std::thread::JoinHandle; use std::time::Duration; use streamer; +use write_stage::WriteStage; pub struct Rpu { bank: Arc, @@ -37,29 +36,6 @@ impl Rpu { } } - fn write_service( - bank: Arc, - exit: Arc, - broadcast: streamer::BlobSender, - blob_recycler: packet::BlobRecycler, - writer: Mutex, - entry_receiver: Receiver, - ) -> JoinHandle<()> { - spawn(move || loop { - let entry_writer = EntryWriter::new(&bank); - let _ = entry_writer.write_and_send_entries( - &broadcast, - &blob_recycler, - &writer, - &entry_receiver, - ); - if exit.load(Ordering::Relaxed) { - info!("broadcat_service exiting"); - break; - } - }) - } - /// Create a UDP microservice that forwards messages the given Rpu. /// This service is the network leader /// Set `exit` to shutdown its threads. @@ -106,11 +82,9 @@ impl Rpu { self.tick_duration, ); - let (broadcast_sender, broadcast_receiver) = channel(); - let t_write = Self::write_service( + let write_stage = WriteStage::new( self.bank.clone(), exit.clone(), - broadcast_sender, blob_recycler.clone(), Mutex::new(writer), record_stage.entry_receiver, @@ -122,7 +96,7 @@ impl Rpu { exit.clone(), crdt.clone(), blob_recycler.clone(), - broadcast_receiver, + write_stage.blob_receiver, ); let respond_socket = UdpSocket::bind(local.clone())?; @@ -137,7 +111,7 @@ impl Rpu { t_receiver, t_responder, request_stage.thread_hdl, - t_write, + write_stage.thread_hdl, t_gossip, t_listen, t_broadcast, diff --git a/src/tvu.rs b/src/tvu.rs index 21b2ca61f0..db77525e89 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -3,8 +3,6 @@ use bank::Bank; use crdt::{Crdt, ReplicatedData}; -use entry::Entry; -use entry_writer::EntryWriter; use hash::Hash; use ledger; use packet; @@ -15,11 +13,12 @@ use result::Result; use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver}; +use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; +use write_stage::WriteStage; pub struct Tvu { bank: Arc, @@ -37,23 +36,6 @@ impl Tvu { } } - fn drain_service( - bank: Arc, - exit: Arc, - entry_receiver: Receiver, - ) -> JoinHandle<()> { - spawn(move || { - let entry_writer = EntryWriter::new(&bank); - loop { - let _ = entry_writer.drain_entries(&entry_receiver); - if exit.load(Ordering::Relaxed) { - info!("drain_service exiting"); - break; - } - } - }) - } - /// Process verified blobs, already in order /// Respond with a signed hash of the state fn replicate_state( @@ -188,8 +170,8 @@ impl Tvu { obj.tick_duration, ); - let t_write = - Self::drain_service(obj.bank.clone(), exit.clone(), record_stage.entry_receiver); + let write_stage = + WriteStage::new_drain(obj.bank.clone(), exit.clone(), record_stage.entry_receiver); let t_responder = streamer::responder( respond_socket, @@ -210,7 +192,7 @@ impl Tvu { t_packet_receiver, t_responder, request_stage.thread_hdl, - t_write, + write_stage.thread_hdl, ]; threads.extend(sig_verify_stage.thread_hdls.into_iter()); Ok(threads) diff --git a/src/write_stage.rs b/src/write_stage.rs new file mode 100644 index 0000000000..fd25384677 --- /dev/null +++ b/src/write_stage.rs @@ -0,0 +1,71 @@ +//! The `write_stage` module implements write stage of the RPU. + +use bank::Bank; +use entry::Entry; +use entry_writer::EntryWriter; +use packet; +use std::io::Write; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Receiver}; +use std::sync::{Arc, Mutex}; +use std::thread::{spawn, JoinHandle}; +use streamer; + +pub struct WriteStage { + pub thread_hdl: JoinHandle<()>, + pub blob_receiver: streamer::BlobReceiver, +} + +impl WriteStage { + /// Create a new Rpu that wraps the given Bank. + pub fn new( + bank: Arc, + exit: Arc, + blob_recycler: packet::BlobRecycler, + writer: Mutex, + entry_receiver: Receiver, + ) -> Self { + let (blob_sender, blob_receiver) = channel(); + let thread_hdl = spawn(move || loop { + let entry_writer = EntryWriter::new(&bank); + let _ = entry_writer.write_and_send_entries( + &blob_sender, + &blob_recycler, + &writer, + &entry_receiver, + ); + if exit.load(Ordering::Relaxed) { + info!("broadcat_service exiting"); + break; + } + }); + + WriteStage { + thread_hdl, + blob_receiver, + } + } + + pub fn new_drain( + bank: Arc, + exit: Arc, + entry_receiver: Receiver, + ) -> Self { + let (_blob_sender, blob_receiver) = channel(); + let thread_hdl = spawn(move || { + let entry_writer = EntryWriter::new(&bank); + loop { + let _ = entry_writer.drain_entries(&entry_receiver); + if exit.load(Ordering::Relaxed) { + info!("drain_service exiting"); + break; + } + } + }); + + WriteStage { + thread_hdl, + blob_receiver, + } + } +}